|
|
Package consumer contains interface for any consumer that is able to
process messages. It also contains implementation of Kafka consumer.
It is expected that consumed messages are generated by ccx-data-pipeline
based on OCP rules framework. The report generated by the framework are
enhanced with more context information taken from different sources, like
the organization ID, account number, unique cluster name, and the
LastChecked timestamp (taken from the incoming Kafka record containing the
URL to the archive).
It is also expected that consumed messages contains one INFO rule hit that
contains cluster version. That rule hit is produced by special rule used
only in external data pipeline:
"versioninfo|CLUSTERVERSION_INFO"
|
package consumer
import (
"context"
"strings"
"github.com/Shopify/sarama"
"github.com/rs/zerolog/log"
"github.com/RedHatInsights/insights-results-aggregator/broker"
"github.com/RedHatInsights/insights-results-aggregator/producer"
"github.com/RedHatInsights/insights-results-aggregator/storage"
)
|
KafkaConsumer is an implementation of Consumer interface
Example:
KafkaConsumer, err := consumer.NewKafkaConsumer(brokerCfg, storage)
if err != nil {
panic(err)
}
KafkaConsumer.Serve()
err := KafkaConsumer.Stop()
if err != nil {
panic(err)
}
|
type KafkaConsumer struct {
Configuration broker . Configuration
ConsumerGroup sarama . ConsumerGroup
Storage storage . Storage
MessageProcessor MessageProcessor
numberOfSuccessfullyConsumedMessages uint64
numberOfErrorsConsumingMessages uint64
ready chan bool
cancel context . CancelFunc
payloadTrackerProducer * producer . PayloadTrackerProducer
deadLetterProducer * producer . DeadLetterProducer
}
|
DefaultSaramaConfig is a config which will be used by default
here you can use specific version of a protocol for example
useful for testing
|
var DefaultSaramaConfig * sarama . Config
|
NewKafkaConsumer constructs new implementation of Consumer interface
|
func NewKafkaConsumer ( brokerCfg broker . Configuration , storage storage . Storage , processor MessageProcessor ) ( * KafkaConsumer , error ) {
return NewKafkaConsumerWithSaramaConfig ( brokerCfg , storage , DefaultSaramaConfig , processor )
}
|
NewKafkaConsumerWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config
|
func NewKafkaConsumerWithSaramaConfig (
brokerCfg broker . Configuration ,
storage storage . Storage ,
saramaConfig * sarama . Config ,
processor MessageProcessor ,
) ( * KafkaConsumer , error ) {
var err error
if saramaConfig == nil {
saramaConfig , err = broker . SaramaConfigFromBrokerConfig ( brokerCfg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to create sarama configuration from current broker configuration" )
return nil , err
}
}
log . Info ( ) .
Str ( "addresses" , brokerCfg . Addresses ) .
Str ( "group" , brokerCfg . Group ) .
Msg ( "New consumer group" )
consumerGroup , err := sarama . NewConsumerGroup ( strings . Split ( brokerCfg . Addresses , "," ) , brokerCfg . Group , saramaConfig )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to create consumer group" )
return nil , err
}
log . Info ( ) . Msg ( "Consumer group has been created" )
log . Info ( ) . Msg ( "Constructing payload tracker producer" )
payloadTrackerProducer , err := producer . NewPayloadTrackerProducer ( brokerCfg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to construct payload tracker producer" )
return nil , err
}
if payloadTrackerProducer == nil {
log . Info ( ) . Msg ( "Payload tracker producer not configured" )
} else {
log . Info ( ) . Msg ( "Payload tracker producer has been configured" )
}
log . Info ( ) . Msg ( "Constructing DLQ producer" )
deadLetterProducer , err := producer . NewDeadLetterProducer ( brokerCfg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to construct dead letter producer" )
return nil , err
}
if deadLetterProducer == nil {
log . Info ( ) . Msg ( "Dead letter producer not configured" )
} else {
log . Info ( ) . Msg ( "Dead letter producer has been configured" )
}
consumer := & KafkaConsumer {
Configuration : brokerCfg ,
ConsumerGroup : consumerGroup ,
Storage : storage ,
MessageProcessor : processor ,
numberOfSuccessfullyConsumedMessages : 0 ,
numberOfErrorsConsumingMessages : 0 ,
ready : make ( chan bool ) ,
payloadTrackerProducer : payloadTrackerProducer ,
deadLetterProducer : deadLetterProducer ,
}
return consumer , nil
}
|
Serve starts listening for messages and processing them. It blocks current thread.
|
func ( consumer * KafkaConsumer ) Serve ( ) {
ctx , cancel := context . WithCancel ( context . Background ( ) )
consumer . cancel = cancel
go func ( ) {
for {
|
Consume should be called inside an infinite loop, when a
server-side rebalance happens, the consumer session will need to be
recreated to get the new claims
|
if err := consumer . ConsumerGroup . Consume ( ctx , [ ] string { consumer . Configuration . Topic } , consumer ) ; err != nil {
log . Fatal ( ) . Err ( err ) . Msg ( "unable to recreate kafka session" )
}
|
check if context was cancelled, signaling that the consumer should stop
|
if ctx . Err ( ) != nil {
return
}
log . Info ( ) . Msg ( "created new kafka session" )
consumer . ready = make ( chan bool )
}
} ( )
|
Wait for the consumer to be set up
|
log . Info ( ) . Msg ( "waiting for consumer to become ready" )
<- consumer . ready
log . Info ( ) . Msg ( "finished waiting for consumer to become ready" )
|
Actual processing is done in goroutine created by sarama (see ConsumeClaim below)
|
log . Info ( ) . Msg ( "started serving consumer" )
<- ctx . Done ( )
log . Info ( ) . Msg ( "context cancelled, exiting" )
cancel ( )
}
|
Setup is run at the beginning of a new session, before ConsumeClaim
|
func ( consumer * KafkaConsumer ) Setup ( sarama . ConsumerGroupSession ) error {
log . Info ( ) . Msg ( "new session has been setup" )
|
Mark the consumer as ready
|
close ( consumer . ready )
return nil
}
|
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
|
func ( consumer * KafkaConsumer ) Cleanup ( sarama . ConsumerGroupSession ) error {
log . Info ( ) . Msg ( "new session has been finished" )
return nil
}
|
ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().
|
func ( consumer * KafkaConsumer ) ConsumeClaim ( session sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error {
log . Info ( ) .
Int64 ( offsetKey , claim . InitialOffset ( ) ) .
Msg ( "starting messages loop" )
for message := range claim . Messages ( ) {
err := consumer . HandleMessage ( message )
if err != nil {
|
already handled in HandleMessage, just log
|
log . Error ( ) . Err ( err ) . Msg ( "Problem while handling the message" )
}
session . MarkMessage ( message , "" )
}
return nil
}
|
Close method closes all resources used by consumer
|
func ( consumer * KafkaConsumer ) Close ( ) error {
if consumer . cancel != nil {
consumer . cancel ( )
}
if consumer . ConsumerGroup != nil {
if err := consumer . ConsumerGroup . Close ( ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to close consumer group" )
}
}
if consumer . payloadTrackerProducer != nil {
if err := consumer . payloadTrackerProducer . Close ( ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to close payload tracker Kafka producer" )
}
}
if consumer . deadLetterProducer != nil {
if err := consumer . deadLetterProducer . Close ( ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to close dead letter Kafka producer" )
}
}
return nil
}
|
GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages
since creating KafkaConsumer obj
|
func ( consumer * KafkaConsumer ) GetNumberOfSuccessfullyConsumedMessages ( ) uint64 {
return consumer . numberOfSuccessfullyConsumedMessages
}
|
GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages
since creating KafkaConsumer obj
|
func ( consumer * KafkaConsumer ) GetNumberOfErrorsConsumingMessages ( ) uint64 {
return consumer . numberOfErrorsConsumingMessages
}
|