|
|
Package kafka contains data types, interfaces, and methods related to
Kafka that can be used to configure brokers, as well as consume/produce
messages.
|
package kafka
import (
"crypto/sha512"
"strings"
"time"
tlsutils "github.com/RedHatInsights/insights-operator-utils/tls"
"github.com/Shopify/sarama"
"github.com/rs/zerolog/log"
)
|
BrokerConfiguration represents configuration of a single-instance Kafka broker
|
type BrokerConfiguration struct {
|
Viper does not unmarshall automagically to a slice.
Handling a string is easier and nicer than all the code required to do so
|
Addresses string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time . Duration `mapstructure:"timeout" toml:"timeout"`
Group string `mapstructure:"group" toml:"group"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}
|
SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
|
func SaramaConfigFromBrokerConfig ( cfg * BrokerConfiguration ) ( * sarama . Config , error ) {
saramaConfig := sarama . NewConfig ( )
if cfg . Timeout > 0 {
saramaConfig . Net . DialTimeout = cfg . Timeout
saramaConfig . Net . ReadTimeout = cfg . Timeout
saramaConfig . Net . WriteTimeout = cfg . Timeout
}
if strings . Contains ( cfg . SecurityProtocol , "SSL" ) {
saramaConfig . Net . TLS . Enable = true
}
if strings . EqualFold ( cfg . SecurityProtocol , "SSL" ) && cfg . CertPath != "" {
tlsConfig , err := tlsutils . NewTLSConfig ( cfg . CertPath )
if err != nil {
log . Error ( ) . Msgf ( "Unable to load TLS config for %s cert" , cfg . CertPath )
return nil , err
}
saramaConfig . Net . TLS . Config = tlsConfig
} else if strings . HasPrefix ( cfg . SecurityProtocol , "SASL_" ) {
log . Info ( ) . Msg ( "Configuring SASL authentication" )
saramaConfig . Net . SASL . Enable = true
saramaConfig . Net . SASL . User = cfg . SaslUsername
saramaConfig . Net . SASL . Password = cfg . SaslPassword
saramaConfig . Net . SASL . Mechanism = sarama . SASLMechanism ( cfg . SaslMechanism )
if strings . EqualFold ( cfg . SaslMechanism , sarama . SASLTypeSCRAMSHA512 ) {
log . Info ( ) . Msg ( "Configuring SCRAM-SHA512" )
saramaConfig . Net . SASL . Handshake = true
saramaConfig . Net . SASL . SCRAMClientGeneratorFunc = func ( ) sarama . SCRAMClient {
return & SCRAMClient { HashGeneratorFcn : sha512 . New }
}
}
}
|
ClientID is fully optional, but by setting it, we can get rid of some warning messages in logs
|
if cfg . ClientID != "" {
|
if not set, the "sarama" will be used instead
|
saramaConfig . ClientID = cfg . ClientID
}
|
now the config structure is filled-in
|
return saramaConfig , nil
}
|