Package kafka contains an implementation of Producer interface that can be
used to produce (that is send) messages to properly configured Kafka broker.
package kafka
Generated documentation is available at:
Documentation in literate-programming-style is available at:
import (
tlsutils "github.com/RedHatInsights/insights-operator-utils/tls"
Producer is an implementation of Producer interface
type Producer struct {
Configuration conf . KafkaConfiguration
Producer sarama . SyncProducer
New constructs new implementation of Producer interface
func New ( config * conf . ConfigStruct ) ( * Producer , error ) {
kafkaConfig := conf . GetKafkaBrokerConfiguration ( config )
saramaConfig , err := saramaConfigFromBrokerConfig ( & kafkaConfig )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to create a valid Kafka configuration" )
producer , err := sarama . NewSyncProducer ( strings . Split ( kafkaConfig . Addresses , "," ) , saramaConfig )
if err != nil {
log . Error ( ) . Str ( "Kafka address" , kafkaConfig . Addresses ) . Err ( err ) . Msg ( "unable to start a Kafka producer" )
return nil , err
return & Producer {
Configuration : kafkaConfig ,
Producer : producer ,
} , nil
ProduceMessage produces message to selected topic. That function returns
partition ID and offset of new message or an error value in case of any
problem on broker side.
func ( producer * Producer ) ProduceMessage ( msg types . ProducerMessage ) ( partitionID int32 , offset int64 , err error ) {
no-op when producer is disabled
(this logic allows us to enable/disable producer on the fly
if ! producer . Configuration . Enabled {
producerMsg := & sarama . ProducerMessage {
Topic : producer . Configuration . Topic ,
Value : sarama . ByteEncoder ( msg ) ,
partitionID , offset , err = producer . Producer . SendMessage ( producerMsg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "failed to produce message to Kafka" )
} else {
log . Info ( ) . Int ( "partition" , int ( partitionID ) ) . Int ( "offset" , int ( offset ) ) . Msg ( "message sent" )
Close allow the Sarama producer to be gracefully closed
func ( producer * Producer ) Close ( ) error {
log . Info ( ) . Msg ( "Shutting down kafka producer" )
if err := producer . Producer . Close ( ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to close Kafka producer" )
return err
return nil
func saramaConfigFromBrokerConfig ( cfg * conf . KafkaConfiguration ) ( * sarama . Config , error ) {
saramaConfig := sarama . NewConfig ( )
saramaConfig . Version = sarama . V0_10_2_0
if strings . Contains ( cfg . SecurityProtocol , "SSL" ) {
saramaConfig . Net . TLS . Enable = true
if strings . EqualFold ( cfg . SecurityProtocol , "SSL" ) && cfg . CertPath != "" {
tlsConfig , err := tlsutils . NewTLSConfig ( cfg . CertPath )
if err != nil {
log . Error ( ) . Msgf ( "Unable to load TLS config for %s cert" , cfg . CertPath )
return nil , err
saramaConfig . Net . TLS . Config = tlsConfig
} else if strings . HasPrefix ( cfg . SecurityProtocol , "SASL_" ) {
log . Info ( ) . Msg ( "Configuring SASL authentication" )
saramaConfig . Net . SASL . Enable = true
saramaConfig . Net . SASL . User = cfg . SaslUsername
saramaConfig . Net . SASL . Password = cfg . SaslPassword
saramaConfig . Net . SASL . Mechanism = sarama . SASLMechanism ( cfg . SaslMechanism )
if strings . EqualFold ( cfg . SaslMechanism , sarama . SASLTypeSCRAMSHA512 ) {
log . Info ( ) . Msg ( "Configuring SCRAM-SHA512" )
saramaConfig . Net . SASL . Handshake = true
saramaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & SCRAMClient { HashGeneratorFcn : sha512 . New }
saramaConfig . Producer . Return . Successes = true
return saramaConfig , nil