Package kafka contains data types, interfaces, and methods related to
Kafka that can be used to configure brokers, as well as consume/produce
package kafka
import (
tlsutils "github.com/RedHatInsights/insights-operator-utils/tls"
BrokerConfiguration represents configuration of a single-instance Kafka broker
type BrokerConfiguration struct {
Viper does not unmarshall automagically to a slice.
Handling a string is easier and nicer than all the code required to do so
Addresses string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time . Duration `mapstructure:"timeout" toml:"timeout"`
Group string `mapstructure:"group" toml:"group"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
Enabled bool `mapstructure:"enabled" toml:"enabled"`
SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
func SaramaConfigFromBrokerConfig ( cfg * BrokerConfiguration ) ( * sarama . Config , error ) {
saramaConfig := sarama . NewConfig ( )
if cfg . Timeout > 0 {
saramaConfig . Net . DialTimeout = cfg . Timeout
saramaConfig . Net . ReadTimeout = cfg . Timeout
saramaConfig . Net . WriteTimeout = cfg . Timeout
if strings . Contains ( cfg . SecurityProtocol , "SSL" ) {
saramaConfig . Net . TLS . Enable = true
if strings . EqualFold ( cfg . SecurityProtocol , "SSL" ) && cfg . CertPath != "" {
tlsConfig , err := tlsutils . NewTLSConfig ( cfg . CertPath )
if err != nil {
log . Error ( ) . Msgf ( "Unable to load TLS config for %s cert" , cfg . CertPath )
return nil , err
saramaConfig . Net . TLS . Config = tlsConfig
} else if strings . HasPrefix ( cfg . SecurityProtocol , "SASL_" ) {
log . Info ( ) . Msg ( "Configuring SASL authentication" )
saramaConfig . Net . SASL . Enable = true
saramaConfig . Net . SASL . User = cfg . SaslUsername
saramaConfig . Net . SASL . Password = cfg . SaslPassword
saramaConfig . Net . SASL . Mechanism = sarama . SASLMechanism ( cfg . SaslMechanism )
if strings . EqualFold ( cfg . SaslMechanism , sarama . SASLTypeSCRAMSHA512 ) {
log . Info ( ) . Msg ( "Configuring SCRAM-SHA512" )
saramaConfig . Net . SASL . Handshake = true
saramaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & SCRAMClient { HashGeneratorFcn : sha512 . New }
ClientID is fully optional, but by setting it, we can get rid of some warning messages in logs
if cfg . ClientID != "" {
if not set, the "sarama" will be used instead
saramaConfig . ClientID = cfg . ClientID
now the config structure is filled-in
return saramaConfig , nil