Package conf contains definition of data type named ConfigStruct that
represents configuration of Notification service. This source file
also contains function named LoadConfiguration that can be used to load
configuration from provided configuration file and/or from environment
variables. Additionally several specific functions named
GetStorageConfiguration, GetLoggingConfiguration,
GetKafkaBrokerConfiguration, GetNotificationsConfiguration and
GetMetricsConfiguration are to be used to return specific configuration
package conf
Generated documentation is available at:
Documentation in literate-programming-style is available at:
Default name of configuration file is config.toml
It can be changed via environment variable NOTIFICATIONSERVICECONFIG_FILE
An example of configuration file that can be used in devel environment:
dbdriver = "postgres"
pgusername = "user"
pgpassword = "password"
pghost = "localhost"
pgport = 5432
pgdbname = "notification"
pgparams = "sslmode=disable"
logsqlqueries = true
debug = true
log_level = ""
Environment variables that can be used to override configuration file settings:
import (
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"
Common constants used for logging and error reporting
const (
filenameAttribute = "filename"
parsingConfigurationFileMessage = "parsing configuration file"
noKafkaConfig = "no Kafka configuration available in Clowder, using default one"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
noStorage = "warning: no storage section in Clowder config"
Constants for env var name and default config filename
const (
DefaultConfigFileName = "config"
ConfigStruct is a structure holding the whole notification service
type ConfigStruct struct {
LoggingConf logger . LoggingConfiguration `mapstructure:"logging" toml:"logging"`
CloudWatchConf logger . CloudWatchConfiguration `mapstructure:"cloudwatch" toml:"cloudwatch"`
SentryLoggingConf logger . SentryLoggingConfiguration `mapstructure:"sentry" toml:"sentry"`
KafkaZerologConf logger . KafkaZerologConfiguration `mapstructure:"kafka_zerolog" toml:"kafka_zerolog"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Kafka KafkaConfiguration `mapstructure:"kafka_broker" toml:"kafka_broker"`
ServiceLog ServiceLogConfiguration `mapstructure:"service_log" toml:"service_log"`
Dependencies DependenciesConfiguration `mapstructure:"dependencies" toml:"dependencies"`
Notifications NotificationsConfiguration `mapstructure:"notifications" toml:"notifications"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
Cleaner CleanerConfiguration `mapstructure:"cleaner" toml:"cleaner"`
Processing ProcessingConfiguration `mapstructure:"processing" toml:"processing"`
DeleteOperation bool
LoggingConfiguration represents configuration for logging in general
type LoggingConfiguration struct {
Debug enables pretty colored logging
Debug bool `mapstructure:"debug" toml:"debug"`
LogLevel sets logging level to show. Possible values are:
"warn", "warning"
logging level won't be changed if value is not one of listed above
LogLevel string `mapstructure:"log_level" toml:"log_level"`
StorageConfiguration represents configuration of postgresQSL data storage
type StorageConfiguration struct {
Driver string `mapstructure:"db_driver" toml:"db_driver"`
PGUsername string `mapstructure:"pg_username" toml:"pg_username"`
PGPassword string `mapstructure:"pg_password" toml:"pg_password"`
PGHost string `mapstructure:"pg_host" toml:"pg_host"`
PGPort int `mapstructure:"pg_port" toml:"pg_port"`
PGDBName string `mapstructure:"pg_db_name" toml:"pg_db_name"`
PGParams string `mapstructure:"pg_params" toml:"pg_params"`
LogSQLQueries bool `mapstructure:"log_sql_queries" toml:"log_sql_queries"`
DependenciesConfiguration represents configuration of external services and other dependencies
type DependenciesConfiguration struct {
ContentServiceServer string `mapstructure:"content_server" toml:"content_server"`
ContentServiceEndpoint string `mapstructure:"content_endpoint" toml:"content_endpoint"`
TemplateRendererServer string `mapstructure:"template_renderer_server" toml:"template_renderer_server"`
TemplateRendererEndpoint string `mapstructure:"template_renderer_endpoint" toml:"template_renderer_endpoint"`
TemplateRendererURL string
CleanerConfiguration represents configuration for the main cleaner
type CleanerConfiguration struct {
MaxAge is specification of max age for records to be cleaned
MaxAge string `mapstructure:"max_age" toml:"max_age"`
KafkaConfiguration represents configuration of Kafka brokers and topics
type KafkaConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
Addresses string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time . Duration `mapstructure:"timeout" toml:"timeout"`
LikelihoodThreshold int `mapstructure:"likelihood_threshold" toml:"likelihood_threshold"`
ImpactThreshold int `mapstructure:"impact_threshold" toml:"impact_threshold"`
SeverityThreshold int `mapstructure:"severity_threshold" toml:"severity_threshold"`
TotalRiskThreshold int `mapstructure:"total_risk_threshold" toml:"total_risk_threshold"`
Cooldown string `mapstructure:"cooldown" toml:"cooldown"`
EventFilter string `mapstructure:"event_filter" toml:"event_filter"`
TagFilterEnabled bool `mapstructure:"tag_filter_enabled" toml:"tag_filter_enabled"`
Tags [ ] string `mapstructure:"tags" toml:"tags"`
TagsSet types . TagsSet
ServiceLogConfiguration represents configuration of ServiceLog REST API
type ServiceLogConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
ClientSecret string `mapstructure:"client_secret" toml:"client_secret"`
CreatedBy string `mapstructure:"created_by" toml:"created_by"`
Username string `mapstructure:"username" toml:"username"`
TokenURL string `mapstructure:"token_url" toml:"token_url"`
URL string `mapstructure:"url" toml:"url"`
Timeout time . Duration `mapstructure:"timeout" toml:"timeout"`
LikelihoodThreshold int `mapstructure:"likelihood_threshold" toml:"likelihood_threshold"`
ImpactThreshold int `mapstructure:"impact_threshold" toml:"impact_threshold"`
SeverityThreshold int `mapstructure:"severity_threshold" toml:"severity_threshold"`
TotalRiskThreshold int `mapstructure:"total_risk_threshold" toml:"total_risk_threshold"`
Cooldown string `mapstructure:"cooldown" toml:"cooldown"`
EventFilter string `mapstructure:"event_filter" toml:"event_filter"`
RuleDetailsURI string `mapstructure:"rule_details_uri" toml:"rule_details_uri"`
TagFilterEnabled bool `mapstructure:"tag_filter_enabled" toml:"tag_filter_enabled"`
Tags [ ] string `mapstructure:"tags" toml:"tags"`
TagsSet types . TagsSet
NotificationsConfiguration represents the configuration specific to the content of notifications
type NotificationsConfiguration struct {
InsightsAdvisorURL string `mapstructure:"insights_advisor_url" toml:"insights_advisor_url"`
ClusterDetailsURI string `mapstructure:"cluster_details_uri" toml:"cluster_details_uri"`
RuleDetailsURI string `mapstructure:"rule_details_uri" toml:"rule_details_uri"`
MetricsConfiguration holds metrics related configuration
type MetricsConfiguration struct {
Job string `mapstructure:"job_name" toml:"job_name"`
Namespace string `mapstructure:"namespace" toml:"namespace"`
Subsystem string `mapstructure:"subsystem" toml:"subsystem"`
GatewayURL string `mapstructure:"gateway_url" toml:"gateway_url"`
GatewayAuthToken string `mapstructure:"gateway_auth_token" toml:"gateway_auth_token"`
Retries int `mapstructure:"retries" toml:"retries"`
RetryAfter time . Duration `mapstructure:"retry_after" toml:"retry_after"`
GatewayTimeBetweenPush time . Duration `mapstructure:"gateway_time_between_push" toml:"gateway_time_between_push"`
ProcessingConfiguration represents configuration for processing subsystem
type ProcessingConfiguration struct {
FilterAllowedClusters bool `mapstructure:"filter_allowed_clusters" toml:"filter_allowed_clusters"`
AllowedClusters [ ] string `mapstructure:"allowed_clusters" toml:"allowed_clusters"`
FilterBlockedClusters bool `mapstructure:"filter_blocked_clusters" toml:"filter_blocked_clusters"`
BlockedClusters [ ] string `mapstructure:"blocked_clusters" toml:"blocked_clusters"`
LoadConfiguration loads configuration from defaultConfigFile, file set in
configFileEnvVariableName or from env
func LoadConfiguration ( configFileEnvVariableName , defaultConfigFile string ) ( ConfigStruct , error ) {
var configuration ConfigStruct
env. variable holding name of configuration file
configFile , specified := os . LookupEnv ( configFileEnvVariableName )
if specified {
log . Info ( ) . Str ( filenameAttribute , configFile ) . Msg ( parsingConfigurationFileMessage )
we need to separate the directory name and filename without
directory , basename := filepath . Split ( configFile )
file := strings . TrimSuffix ( basename , filepath . Ext ( basename ) )
parse the configuration
viper . SetConfigName ( file )
viper . AddConfigPath ( directory )
} else {
log . Info ( ) . Str ( filenameAttribute , defaultConfigFile ) . Msg ( parsingConfigurationFileMessage )
parse the configuration
viper . SetConfigName ( defaultConfigFile )
viper . AddConfigPath ( "." )
try to read the whole configuration
err := viper . ReadInConfig ( )
if _ , isNotFoundError := err . ( viper . ConfigFileNotFoundError ) ; ! specified && isNotFoundError {
If config file is not present (which might be correct in
some environment) we need to read configuration from
environment variables. The problem is that Viper is not
smart enough to understand the structure of config by
itself, so we need to read fake config file
fakeTomlConfigWriter := new ( bytes . Buffer )
err := toml . NewEncoder ( fakeTomlConfigWriter ) . Encode ( configuration )
if err != nil {
return configuration , err
fakeTomlConfig := fakeTomlConfigWriter . String ( )
viper . SetConfigType ( "toml" )
err = viper . ReadConfig ( strings . NewReader ( fakeTomlConfig ) )
if err != nil {
return configuration , err
} else if err != nil {
error is processed on caller side
return configuration , fmt . Errorf ( "fatal error config file: %s" , err )
override configuration from env if there's variable in env
viper . AutomaticEnv ( )
viper . SetEnvPrefix ( envPrefix )
viper . SetEnvKeyReplacer ( strings . NewReplacer ( "-" , "_" , "." , "__" ) )
err = viper . Unmarshal ( & configuration )
if err != nil {
return configuration , err
updateConfigFromClowder ( & configuration )
configuration . Dependencies . TemplateRendererURL , err = createURL (
configuration . Dependencies . TemplateRendererServer ,
configuration . Dependencies . TemplateRendererEndpoint )
if err != nil {
fmt . Println ( "error creating content template renderer URL" )
return configuration , err
convert list/slice into regular set
configuration . Kafka . TagsSet = types . MakeSetOfTags ( configuration . Kafka . Tags )
configuration . ServiceLog . TagsSet = types . MakeSetOfTags ( configuration . ServiceLog . Tags )
everything's should be ok
return configuration , nil
func createURL ( server , endpoint string ) ( string , error ) {
u , err := url . Parse ( server )
if err != nil {
return "" , err
u . Path = path . Join ( u . Path , endpoint )
return u . String ( ) , nil
GetStorageConfiguration returns storage configuration
func GetStorageConfiguration ( configuration * ConfigStruct ) StorageConfiguration {
return configuration . Storage
GetLoggingConfiguration returns logging configuration
func GetLoggingConfiguration ( configuration * ConfigStruct ) logger . LoggingConfiguration {
return configuration . LoggingConf
GetCloudWatchConfiguration returns cloudwatch configuration
func GetCloudWatchConfiguration ( configuration * ConfigStruct ) logger . CloudWatchConfiguration {
return configuration . CloudWatchConf
GetSentryLoggingConfiguration returns sentry logging configuration
func GetSentryLoggingConfiguration ( configuration * ConfigStruct ) logger . SentryLoggingConfiguration {
return configuration . SentryLoggingConf
GetKafkaZerologConfiguration returns the kafkazero log configuration
func GetKafkaZerologConfiguration ( configuration * ConfigStruct ) logger . KafkaZerologConfiguration {
return configuration . KafkaZerologConf
GetKafkaBrokerConfiguration returns kafka broker configuration
func GetKafkaBrokerConfiguration ( configuration * ConfigStruct ) KafkaConfiguration {
return configuration . Kafka
GetServiceLogConfiguration returns ServiceLog configuration
func GetServiceLogConfiguration ( configuration * ConfigStruct ) ServiceLogConfiguration {
return configuration . ServiceLog
GetDependenciesConfiguration returns dependencies configuration
func GetDependenciesConfiguration ( configuration * ConfigStruct ) DependenciesConfiguration {
return configuration . Dependencies
GetNotificationsConfiguration returns configuration related with notification content
func GetNotificationsConfiguration ( configuration * ConfigStruct ) NotificationsConfiguration {
return configuration . Notifications
GetMetricsConfiguration returns metrics configuration
func GetMetricsConfiguration ( configuration * ConfigStruct ) MetricsConfiguration {
return configuration . Metrics
GetCleanerConfiguration returns cleaner configuration
func GetCleanerConfiguration ( configuration * ConfigStruct ) CleanerConfiguration {
return configuration . Cleaner
GetProcessingConfiguration returns processing configuration
func GetProcessingConfiguration ( configuration * ConfigStruct ) ProcessingConfiguration {
return configuration . Processing
func updateStorageCfgFromClowder ( configuration * ConfigStruct ) {
get DB configuration from clowder
configuration . Storage . PGDBName = clowder . LoadedConfig . Database . Name
configuration . Storage . PGHost = clowder . LoadedConfig . Database . Hostname
configuration . Storage . PGPort = clowder . LoadedConfig . Database . Port
configuration . Storage . PGUsername = clowder . LoadedConfig . Database . Username
configuration . Storage . PGPassword = clowder . LoadedConfig . Database . Password
func updateBrokerCfgFromClowder ( configuration * ConfigStruct ) {
make sure broker(s) are configured in Clowder
if len ( clowder . LoadedConfig . Kafka . Brokers ) > 0 {
configuration . Kafka . Addresses = ""
for _ , broker := range clowder . LoadedConfig . Kafka . Brokers {
if broker . Port != nil {
configuration . Kafka . Addresses += fmt . Sprintf ( "%s:%d" , broker . Hostname , * broker . Port ) + ","
} else {
configuration . Kafka . Addresses += broker . Hostname + ","
remove the extra comma
configuration . Kafka . Addresses = configuration . Kafka . Addresses [ : len ( configuration . Kafka . Addresses ) - 1 ]
SSL config
clowderBrokerCfg := clowder . LoadedConfig . Kafka . Brokers [ 0 ]
if clowderBrokerCfg . Authtype != nil {
fmt . Println ( "kafka is configured to use authentication" )
if clowderBrokerCfg . Sasl != nil {
configuration . Kafka . SaslUsername = * clowderBrokerCfg . Sasl . Username
configuration . Kafka . SaslPassword = * clowderBrokerCfg . Sasl . Password
configuration . Kafka . SaslMechanism = * clowderBrokerCfg . Sasl . SaslMechanism
configuration . Kafka . SecurityProtocol = * clowderBrokerCfg . Sasl . SecurityProtocol
if caPath , err := clowder . LoadedConfig . KafkaCa ( clowderBrokerCfg ) ; err == nil {
configuration . Kafka . CertPath = caPath
} else {
fmt . Println ( noSaslConfig )
} else {
fmt . Println ( noBrokerConfig )
updateTopicsMapping ( configuration )
updateConfigFromClowder updates the current config with the values defined in clowder
func updateConfigFromClowder ( configuration * ConfigStruct ) {
if ! clowder . IsClowderEnabled ( ) || clowder . LoadedConfig == nil {
fmt . Println ( "Clowder is disabled" )
fmt . Println ( "Clowder is enabled" )
if clowder . LoadedConfig . Kafka == nil {
fmt . Println ( noKafkaConfig )
} else {
updateBrokerCfgFromClowder ( configuration )
if clowder . LoadedConfig . Database == nil {
fmt . Println ( noStorage )
} else {
updateStorageCfgFromClowder ( configuration )
func updateTopicsMapping ( configuration * ConfigStruct ) {
Updating topics from clowder mapping if available
if topicCfg , ok := clowder . KafkaTopics [ configuration . Kafka . Topic ] ; ok {
configuration . Kafka . Topic = topicCfg . Name
} else {
fmt . Printf ( noTopicMapping , configuration . Kafka . Topic )