|
|
Package differ contains core of CCX Notification Service. Differ itself is
implemented there, together with storage and comparator implementations.
|
package differ
|
Generated documentation is available at:
https://pkg.go.dev/github.com/RedHatInsights/ccx-notification-service/differ
Documentation in literate-programming-style is available at:
https://redhatinsights.github.io/ccx-notification-service/packages/differ/d.html
|
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/rs/zerolog/log"
"github.com/RedHatInsights/ccx-notification-service/ocmclient"
"github.com/RedHatInsights/ccx-notification-service/producer/servicelog"
"github.com/RedHatInsights/ccx-notification-service/producer/kafka"
"github.com/RedHatInsights/ccx-notification-service/conf"
"github.com/RedHatInsights/ccx-notification-service/producer"
"github.com/RedHatInsights/ccx-notification-service/types"
"github.com/RedHatInsights/insights-operator-utils/evaluator"
"github.com/RedHatInsights/insights-operator-utils/logger"
)
|
Exit codes
|
const (
|
ExitStatusOK means that the tool finished with success
|
ExitStatusOK = iota
|
ExitStatusConfiguration is an error code related to program configuration
|
ExitStatusConfiguration
|
ExitStatusError is a general error code
|
ExitStatusError
|
ExitStatusStorageError is returned in case of any consumer-related error
|
ExitStatusStorageError
|
ExitStatusFetchContentError is returned in case content cannot be fetch correctly
|
ExitStatusFetchContentError
|
ExitStatusKafkaBrokerError is for kafka broker connection establishment errors
|
ExitStatusKafkaBrokerError
|
ExitStatusKafkaProducerError is for kafka event production failures
|
ExitStatusKafkaProducerError
|
ExitStatusKafkaConnectionNotClosedError is raised when connection cannot be closed
|
ExitStatusKafkaConnectionNotClosedError
|
ExitStatusCleanerError is raised when clean operation is not successful
|
ExitStatusCleanerError
|
ExitStatusMetricsError is raised when prometheus metrics cannot be pushed
|
ExitStatusMetricsError
|
ExitStatusEventFilterError is raised when event filter is not set correctly
|
ExitStatusEventFilterError
|
ExitStatusServiceLogError is raised when Service Log notifier cannot be initialized
|
ExitStatusServiceLogError
)
|
Total risk values
|
const (
|
TotalRiskLow is the numerical representation of 'Low' total risk
|
TotalRiskLow = 1
|
TotalRiskModerate is the numerical representation of 'Moderate' total risk
|
TotalRiskModerate = iota + 1
|
TotalRiskImportant is the numerical representation of 'Important ' total risk
|
TotalRiskImportant
|
TotalRiskCritical is the numerical representation of 'Critical' total risk
|
TotalRiskCritical
|
TotalRiskMax is the highest total risk handled
|
TotalRiskMax = TotalRiskCritical
|
TotalRiskMin is the lowest total risk handled
|
TotalRiskMin = TotalRiskLow
)
|
Messages
|
const (
serviceName = "CCX Notification Service"
separator = "------------------------------------------------------------"
operationFailedMessage = "Operation failed"
clusterEntryMessage = "cluster entry"
organizationIDAttribute = "org id"
AccountNumberAttribute = "account number"
typeAttribute = "type"
clusterAttribute = "cluster"
ruleAttribute = "rule"
likelihoodAttribute = "likelihood"
impactAttribute = "impact"
errorKeyAttribute = "error key"
numberOfEventsAttribute = "number of events"
clustersAttribute = "clusters"
totalRiskAttribute = "totalRisk"
errorStr = "Error:"
reportWithHighImpactMessage = "Report with impact higher than configured threshold detected"
invalidJSONContent = "The provided content cannot be encoded as JSON."
metricsPushFailedMessage = "Couldn't push prometheus metrics"
tagsNotSetMessage = "Tags for tag filter not set"
evaluationErrorMessage = "Evaluation error"
serviceLogSendErrorMessage = "Sending entry to service log failed for this report"
renderReportsFailedMessage = "Rendering reports failed for this cluster"
ReportNotFoundError = "report for rule ID %v and error key %v has not been found"
destinationNotSet = "No known event destination configured. Aborting."
onlyOneDestinationAllowed = "Only one integration should be enabled (Kafka / Service log. Review your config."
configurationProblem = "Configuration problem"
noEquivalentSeverityMessage = "No Service log severity defined for given total risk. Creating event with Info severity"
)
|
Constants for notification message top level fields
|
const (
notificationBundleName = "openshift"
notificationApplicationName = "advisor"
)
|
Constants for notification event expected fields
|
const (
|
INSTANT NOTIFICATION PAYLOAD FIELDS
|
notificationPayloadRuleDescription = "rule_description"
notificationPayloadRuleURL = "rule_url"
notificationPayloadTotalRisk = "total_risk"
notificationPayloadPublishDate = "publish_date"
)
|
Constants for notification context expected fields
|
const (
notificationContextDisplayName = "display_name"
notificationContextHostURL = "host_url"
)
|
Constants used to filter events
|
const (
DefaultTotalRiskThreshold = 2
DefaultEventFilter = "totalRisk >= totalRiskThreshold"
)
|
Constants used for creating Service Log entries - there is a length limit on text fields in Service Log,
which will return an error status code in case this limit is exceeded
|
const (
serviceLogSummaryMaxLength = 255
serviceLogDescriptionMaxLength = 4000
ServiceLogSeverityInfo = "Info"
ServiceLogSeverityWarning = "Warning"
ServiceLogSeverityMajor = "Major"
ServiceLogSeverityCritical = "Critical"
)
|
EventThresholds structure contains all threshold values for event filter
evaluator
|
type EventThresholds struct {
TotalRisk int
Likelihood int
Impact int
Severity int
}
|
EventValue structure contains all event values for event filter evaluator
|
type EventValue struct {
TotalRisk int
Likelihood int
Impact int
Severity int
}
|
NotificationURLs structure contains all the URLs that are inserted in the notifications
|
type NotificationURLs struct {
ClusterDetails string
RuleDetails string
InsightsAdvisor string
}
|
Differ is the struct that holds all the dependencies and configuration of this service
|
type Differ struct {
Storage Storage
Notifier producer . Producer
NotificationType types . EventType
Target types . EventTarget
PreviouslyReported types . NotifiedRecordsPerCluster
CoolDown string
Thresholds EventThresholds
Filter string
FilterByTag bool
TagsSet types . TagsSet
}
|
TODO: same way we have a Differ struct now, we should have a struct
holding the details of each notification target instead of global
variables.
|
var (
notificationType types . EventType
notificationEventURLs NotificationURLs
serviceLogSeverityMap map [ int ] string
)
func calculateTotalRisk ( impact , likelihood int ) int {
return ( impact + likelihood ) / 2
}
|
ccxrulesocp.external.rules.clusterwideproxyauthcheck.report
->
clusterwideproxyauthcheck
|
func moduleToRuleName ( module types . ModuleName ) types . RuleName {
result := strings . TrimSuffix ( string ( module ) , ".report" )
return ruleIDToRuleName ( types . RuleID ( result ) )
}
|
ccxrulesocp.external.rules.clusterwideproxyauthcheck
->
clusterwideproxyauthcheck
|
func ruleIDToRuleName ( ruleID types . RuleID ) types . RuleName {
return types . RuleName ( ruleID [ strings . LastIndex ( string ( ruleID ) , "." ) + 1 : ] )
}
func findRuleByNameAndErrorKey (
ruleContent types . RulesMap , ruleName types . RuleName , errorKey types . ErrorKey ) (
likelihood int , impact int , totalRisk int , description string , tags types . TagsSet ) {
rc := ruleContent [ string ( ruleName ) ]
ek := rc . ErrorKeys
val := ek [ string ( errorKey ) ]
likelihood = val . Metadata . Likelihood
description = val . Metadata . Description
impact = val . Metadata . Impact . Impact
totalRisk = calculateTotalRisk ( likelihood , impact )
tags = types . MakeSetOfTags ( val . Metadata . Tags )
return
}
|
evaluateFilterExpression function tries to evaluate event filter expression
based on provided threshold values and actual recommendation values
|
func evaluateFilterExpression ( eventFilter string , thresholds EventThresholds , eventValue EventValue ) ( int , error ) {
|
values to be passed into expression evaluator
|
values := make ( map [ string ] int )
values [ "likelihoodThreshold" ] = thresholds . Likelihood
values [ "impactThreshold" ] = thresholds . Impact
values [ "severityThreshold" ] = thresholds . Severity
values [ "totalRiskThreshold" ] = thresholds . TotalRisk
values [ "likelihood" ] = eventValue . Likelihood
values [ "impact" ] = eventValue . Impact
values [ "severity" ] = eventValue . Severity
values [ "totalRisk" ] = eventValue . TotalRisk
|
try to evaluate event filter expression
|
return evaluator . Evaluate ( eventFilter , values )
}
func setServiceLogSeverityMap ( ) {
serviceLogSeverityMap = make ( map [ int ] string , 4 )
serviceLogSeverityMap [ TotalRiskLow ] = ServiceLogSeverityInfo
serviceLogSeverityMap [ TotalRiskModerate ] = ServiceLogSeverityWarning
serviceLogSeverityMap [ TotalRiskImportant ] = ServiceLogSeverityMajor
serviceLogSeverityMap [ TotalRiskCritical ] = ServiceLogSeverityCritical
}
func findRenderedReport ( reports [ ] types . RenderedReport , ruleName types . RuleName , errorKey types . ErrorKey ) ( types . RenderedReport , error ) {
for _ , report := range reports {
reportRuleName := ruleIDToRuleName ( report . RuleID )
if reportRuleName == ruleName && report . ErrorKey == errorKey {
return report , nil
}
}
return types . RenderedReport { } , fmt . Errorf ( ReportNotFoundError , ruleName , errorKey )
}
func createServiceLogEntry ( report * types . RenderedReport , cluster types . ClusterEntry , createdBy , username , severity string ) types . ServiceLogEntry {
logEntry := types . ServiceLogEntry {
ClusterUUID : cluster . ClusterName ,
Description : report . Reason ,
ServiceName : serviceName ,
Severity : severity ,
Summary : report . Description ,
CreatedBy : createdBy ,
Username : username ,
}
|
It is necessary to truncate the fields because of Service Log limitations
|
if len ( logEntry . Summary ) > serviceLogSummaryMaxLength {
logEntry . Summary = logEntry . Summary [ : serviceLogSummaryMaxLength ]
}
if len ( logEntry . Description ) > serviceLogDescriptionMaxLength {
logEntry . Description = logEntry . Description [ : serviceLogDescriptionMaxLength ]
}
return logEntry
}
|
evaluateTagFilter checks if processed rule contains all required tags, for
example tag "osd_customer".
|
func evaluateTagFilter ( filterEnabled bool , tagsSet , reportItemTags types . TagsSet ) bool {
if ! filterEnabled {
return true
}
for neededTag := range tagsSet {
if _ , ok := reportItemTags [ neededTag ] ; ! ok {
return false
}
}
return true
}
func ( d * Differ ) getReportsWithIssuesToNotify ( reports types . ReportContent , cluster types . ClusterEntry , ruleContent types . RulesMap ) ( reportsWithIssues types . ReportContent ) {
reportsWithIssues = make ( types . ReportContent , 0 , len ( reports ) )
for _ , r := range reports {
ruleName := moduleToRuleName ( r . Module )
errorKey := r . ErrorKey
likelihood , impact , totalRisk , _ , tags := findRuleByNameAndErrorKey ( ruleContent , ruleName , errorKey )
eventValue := EventValue {
Likelihood : likelihood ,
Impact : impact ,
TotalRisk : totalRisk ,
}
|
TODO: Duplicated
try to evaluate event filter expression
|
result , err := evaluateFilterExpression ( d . Filter ,
d . Thresholds , eventValue )
if err != nil {
log . Err ( err ) . Msg ( evaluationErrorMessage )
continue
}
|
check if rule contains expected tag(s) if filtering by tags is enabled
|
ruleTagCondition := evaluateTagFilter ( d . FilterByTag , d . TagsSet , tags )
|
send message to target only if message pass both filters
|
if result > 0 && ruleTagCondition {
if ! d . ShouldNotify ( cluster , r ) {
NotificationNotSentSameState . Inc ( )
continue
}
log . Warn ( ) .
Str ( typeAttribute , r . Type ) .
Str ( ruleAttribute , string ( ruleName ) ) .
Str ( errorKeyAttribute , string ( errorKey ) ) .
Int ( likelihoodAttribute , likelihood ) .
Int ( impactAttribute , impact ) .
Int ( totalRiskAttribute , totalRisk ) .
Msg ( reportWithHighImpactMessage )
r . TotalRisk = eventValue . TotalRisk
reportsWithIssues = append ( reportsWithIssues , r )
}
}
return
}
func getServiceLogSeverity ( totalRisk int ) string {
if totalRisk >= TotalRiskMin && totalRisk <= TotalRiskMax {
return serviceLogSeverityMap [ totalRisk ]
}
log . Warn ( ) . Int ( "total_risk" , totalRisk ) . Msg ( noEquivalentSeverityMessage )
return ServiceLogSeverityInfo
}
func ( d * Differ ) createAndSendServiceLogEntry ( configuration * conf . ConfigStruct , renderedReport * types . RenderedReport ,
totalRisk int , cluster types . ClusterEntry ) error {
|
we need to pass the correct "created_by" and "username" attributes
to ServiceLog REST API
|
serviceLogConfiguration := conf . GetServiceLogConfiguration ( configuration )
createdBy := serviceLogConfiguration . CreatedBy
username := serviceLogConfiguration . Username
logEntry := createServiceLogEntry ( renderedReport , cluster , createdBy , username , getServiceLogSeverity ( totalRisk ) )
msgBytes , err := json . Marshal ( logEntry )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( invalidJSONContent )
return nil
}
log . Debug ( ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Str ( "message" , string ( msgBytes ) ) .
Msg ( "Producing service log message" )
_ , _ , err = d . Notifier . ProduceMessage ( msgBytes )
if err != nil {
NotificationNotSentErrorState . Inc ( )
log . Err ( err ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Str ( ruleAttribute , string ( renderedReport . RuleID ) ) .
Str ( errorKeyAttribute , string ( renderedReport . ErrorKey ) ) .
Msg ( serviceLogSendErrorMessage )
return err
}
NotificationSent . Inc ( )
return nil
}
|
ProduceEntriesToServiceLog sends an entry to the service log integration
for each issue found in the given reports
|
func ( d * Differ ) ProduceEntriesToServiceLog ( configuration * conf . ConfigStruct , cluster types . ClusterEntry ,
rules types . Rules , ruleContent types . RulesMap , reports types . ReportContent ) ( totalMessages int , err error ) {
|
TODO: Use pointer when passing around clusterEntry
|
reportsToRender := d . getReportsWithIssuesToNotify ( reports , cluster , ruleContent )
if len ( reportsToRender ) != 0 {
dependenciesConfiguration := conf . GetDependenciesConfiguration ( configuration )
renderedReports , err := renderReportsForCluster (
& dependenciesConfiguration , cluster . ClusterName ,
reportsToRender , rules )
if err != nil {
log . Err ( err ) .
Str ( "cluster name" , string ( cluster . ClusterName ) ) .
Msg ( renderReportsFailedMessage )
return totalMessages , err
}
for _ , r := range reportsToRender {
ruleName := moduleToRuleName ( r . Module )
errorKey := r . ErrorKey
ReportWithHighImpact . Inc ( )
renderedReport , err := findRenderedReport ( renderedReports , ruleName , errorKey )
if err != nil {
log . Err ( err ) . Msgf ( "Output from content template renderer does not contain " +
"result for cluster %s, rule %s and error key %s" , cluster . ClusterName , ruleName , errorKey )
continue
}
addDetailedInfoURLToRenderedReport ( & renderedReport , & configuration . ServiceLog . RuleDetailsURI )
if err = d . createAndSendServiceLogEntry ( configuration , & renderedReport , r . TotalRisk , cluster ) ; err != nil {
continue
}
totalMessages ++
}
}
return totalMessages , nil
}
func ( d * Differ ) produceEntriesToKafka ( cluster types . ClusterEntry , ruleContent types . RulesMap ,
reportItems types . ReportContent , report types . ClusterReport ) ( int , error ) {
notificationMsg := generateInstantNotificationMessage (
& notificationEventURLs . ClusterDetails ,
fmt . Sprint ( cluster . AccountNumber ) ,
fmt . Sprint ( cluster . OrgID ) ,
string ( cluster . ClusterName ) )
notifiedAt := types . Timestamp ( time . Now ( ) )
for _ , r := range reportItems {
module := r . Module
ruleName := moduleToRuleName ( module )
errorKey := r . ErrorKey
likelihood , impact , totalRisk , description , tags := findRuleByNameAndErrorKey ( ruleContent , ruleName , errorKey )
eventValue := EventValue {
Likelihood : likelihood ,
Impact : impact ,
TotalRisk : totalRisk ,
}
|
try to evaluate event filter expression
|
result , err := evaluateFilterExpression ( d . Filter ,
d . Thresholds , eventValue )
if err != nil {
log . Err ( err ) . Msg ( evaluationErrorMessage )
continue
}
|
check if rule contains expected tag(s) if filtering by tags is enabled
|
ruleTagCondition := evaluateTagFilter ( d . FilterByTag , d . TagsSet , tags )
|
send message to target only if message pass both filters
|
if result > 0 && ruleTagCondition {
if ! d . ShouldNotify ( cluster , r ) {
continue
}
log . Warn ( ) .
Str ( typeAttribute , r . Type ) .
Str ( ruleAttribute , string ( ruleName ) ) .
Str ( errorKeyAttribute , string ( errorKey ) ) .
Int ( likelihoodAttribute , likelihood ) .
Int ( impactAttribute , impact ) .
Int ( totalRiskAttribute , totalRisk ) .
Msg ( reportWithHighImpactMessage )
ReportWithHighImpact . Inc ( )
notificationPayloadURL := generateNotificationPayloadURL ( & notificationEventURLs . RuleDetails , string ( cluster . ClusterName ) , module , errorKey )
appendEventToNotificationMessage ( notificationPayloadURL , & notificationMsg , description , totalRisk , time . Time ( cluster . UpdatedAt ) . UTC ( ) . Format ( time . RFC3339Nano ) )
}
}
if len ( notificationMsg . Events ) == 0 {
updateNotificationRecordSameState ( d . Storage , cluster , report , notifiedAt , types . NotificationBackendTarget )
return 0 , nil
}
log . Info ( ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Int ( numberOfEventsAttribute , len ( notificationMsg . Events ) ) .
Msg ( "Producing instant notification" )
msgBytes , err := json . Marshal ( notificationMsg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( invalidJSONContent )
return - 1 , err
}
_ , offset , err := d . Notifier . ProduceMessage ( msgBytes )
if err != nil {
log . Error ( ) .
Str ( errorStr , err . Error ( ) ) .
Msg ( "Couldn't send notification message to kafka topic." )
updateNotificationRecordErrorState ( d . Storage , err , cluster , report , notifiedAt , types . NotificationBackendTarget )
return - 1 , err
}
if offset != - 1 {
|
update the database if any message is sent (not a DisabledProducer)
|
log . Debug ( ) . Msg ( "notifier is not disabled so DB is updated" )
updateNotificationRecordSentState ( d . Storage , cluster , report , notifiedAt , types . NotificationBackendTarget )
return len ( notificationMsg . Events ) , nil
}
return 0 , nil
}
|
checkReadError function checks whether reading from readerrors table was
successful. This log gives important context as the insertion into the
readerror will fail if there's already a row but we couldn't read it for
whatever reason.
|
func checkReadError ( err error ) {
if err != nil {
log . Err ( err ) . Msg ( "read_errors read access error" )
}
}
|
checkWriteError function checks whether writing into read_errors table was
successful
|
func checkWriteError ( err error ) {
if err != nil {
log . Err ( err ) . Msg ( "read_errors write access error" )
}
}
func ( d * Differ ) processReportsByCluster ( config * conf . ConfigStruct , ruleContent types . RulesMap , clusters [ ] types . ClusterEntry ) {
notifiedIssues := 0
clustersCount := len ( clusters )
skippedEntries := 0
emptyEntries := 0
var rules types . Rules
if conf . GetServiceLogConfiguration ( config ) . Enabled {
setServiceLogSeverityMap ( )
rules = getAllContentFromMap ( ruleContent )
}
for i , cluster := range clusters {
log . Debug ( ) .
Int ( "#" , i ) .
Int ( "of" , clustersCount ) .
Int ( organizationIDAttribute , int ( cluster . OrgID ) ) .
Int ( AccountNumberAttribute , int ( cluster . AccountNumber ) ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Msg ( clusterEntryMessage )
report , err := d . Storage . ReadReportForClusterAtTime ( cluster . OrgID , cluster . ClusterName , cluster . UpdatedAt )
if err != nil {
|
is the problem reported already?
|
reportedAlready , readErr := d . Storage . ReadErrorExists ( cluster . OrgID , cluster . ClusterName , cluster . UpdatedAt )
checkReadError ( readErr )
|
if the error is reported already, skip to next one
|
if reportedAlready {
log . Debug ( ) .
Int ( organizationIDAttribute , int ( cluster . OrgID ) ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Time ( "since" , time . Time ( cluster . UpdatedAt ) ) .
Msg ( "Read error already exists" )
continue
}
|
if not reported, process the error
|
ReadReportForClusterErrors . Inc ( )
skippedEntries ++
log . Err ( err ) . Msg ( operationFailedMessage )
writeErr := d . Storage . WriteReadError ( cluster . OrgID , cluster . ClusterName , time . Time ( cluster . UpdatedAt ) , err )
checkWriteError ( writeErr )
continue
}
var deserialized types . Report
err = json . Unmarshal ( [ ] byte ( report ) , & deserialized )
if err != nil {
DeserializeReportErrors . Inc ( )
skippedEntries ++
log . Err ( err ) . Msg ( "Deserialization error - Couldn't create report object" )
log . Debug ( ) . Bytes ( "bytes" , [ ] byte ( report ) ) . Msg ( "Data to be deserialized" )
continue
}
if len ( deserialized . Reports ) == 0 {
log . Info ( ) . Msgf ( "No reports in notification database for cluster %s" , cluster . ClusterName )
emptyEntries ++
continue
}
if conf . GetServiceLogConfiguration ( config ) . Enabled {
notifiedAt := types . Timestamp ( time . Now ( ) )
newNotifiedIssues , err := d . ProduceEntriesToServiceLog ( config , cluster , rules , ruleContent , deserialized . Reports )
updateNotificationRecordState ( d . Storage , cluster , report , newNotifiedIssues , notifiedAt , types . ServiceLogTarget , err )
notifiedIssues += newNotifiedIssues
}
if ! conf . GetKafkaBrokerConfiguration ( config ) . Enabled {
continue
}
newNotifiedIssues , err := d . produceEntriesToKafka ( cluster , ruleContent , deserialized . Reports , report )
if err != nil {
log . Err ( err ) .
Str ( clusterAttribute , string ( cluster . ClusterName ) ) .
Msg ( "Unable to send the notification message to Kafka" )
continue
}
notifiedIssues += newNotifiedIssues
}
log . Info ( ) . Msgf ( "Number of reports not retrieved/deserialized: %d" , skippedEntries )
log . Info ( ) . Msgf ( "Number of empty reports skipped: %d" , emptyEntries )
log . Info ( ) . Msgf ( "Number of high impact issues notified: %d" , notifiedIssues )
}
|
ProcessClusters function creates desired notification messages for all the
clusters obtained from the database
|
func ( d * Differ ) ProcessClusters ( config * conf . ConfigStruct , ruleContent types . RulesMap ,
clusters [ ] types . ClusterEntry ) {
if d . NotificationType == types . InstantNotif {
d . processReportsByCluster ( config , ruleContent , clusters )
}
}
|
SetupKafkaProducer function creates a Kafka producer using the provided configuration
|
func ( d * Differ ) SetupKafkaProducer ( config * conf . ConfigStruct ) error {
kafkaProducer , err := kafka . New ( config )
if err != nil {
ProducerSetupErrors . Inc ( )
log . Error ( ) .
Str ( errorStr , err . Error ( ) ) .
Msg ( "Couldn't initialize Kafka producer with the provided config." )
return & KafkaBrokerError { }
}
d . Notifier = kafkaProducer
log . Info ( ) . Msg ( "Kafka producer ready" )
return nil
}
func ( d * Differ ) setupServiceLogProducer ( config * conf . ConfigStruct ) error {
serviceLogConfig := conf . GetServiceLogConfiguration ( config )
conn , err := ocmclient . NewOCMClient ( serviceLogConfig . ClientID , serviceLogConfig . ClientSecret ,
serviceLogConfig . URL , serviceLogConfig . TokenURL )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "got error while setting up the connection to OCM API gateway" )
return nil
}
serviceLogProducer , err := servicelog . New ( & serviceLogConfig , conn )
if err != nil {
ProducerSetupErrors . Inc ( )
log . Error ( ) .
Str ( errorStr , err . Error ( ) ) .
Msg ( "Couldn't initialize Service Log producer with the provided config." )
return & ServiceLogError { Msg : err . Error ( ) }
}
d . Notifier = serviceLogProducer
log . Info ( ) . Msg ( "Service Log producer ready" )
return nil
}
|
generateInstantNotificationMessage function generates a notification message
container with no events for a given account+cluster
|
func generateInstantNotificationMessage (
clusterURI * string , accountID , orgID , clusterID string ) (
notification types . NotificationMessage ) {
var events [ ] types . Event
notificationContext := types . NotificationContext {
notificationContextDisplayName : clusterID ,
notificationContextHostURL : strings . Replace ( * clusterURI , "{cluster_id}" , clusterID , 1 ) ,
}
notification = types . NotificationMessage {
Bundle : notificationBundleName ,
Application : notificationApplicationName ,
EventType : types . InstantNotif . ToString ( ) ,
Timestamp : time . Now ( ) . UTC ( ) . Format ( time . RFC3339Nano ) ,
AccountID : accountID ,
OrgID : orgID ,
Events : events ,
Context : notificationContext ,
}
return
}
func generateNotificationPayloadURL (
ruleURI * string , clusterID string , module types . ModuleName , errorKey types . ErrorKey ) (
notificationPayloadURL string ) {
parsedModule := strings . ReplaceAll ( string ( module ) , "." , "|" )
replacer := strings . NewReplacer ( "{cluster_id}" , clusterID , "{module}" , parsedModule , "{error_key}" , string ( errorKey ) )
notificationPayloadURL = replacer . Replace ( * ruleURI )
return
}
|
appendEventToNotificationMessage function adds a new event to the given notification message after constructing the payload string
|
func appendEventToNotificationMessage ( notificationPayloadURL string , notification * types . NotificationMessage , ruleDescription string , totalRisk int , publishDate string ) {
payload := types . EventPayload {
notificationPayloadRuleDescription : ruleDescription ,
notificationPayloadRuleURL : notificationPayloadURL ,
notificationPayloadTotalRisk : fmt . Sprint ( totalRisk ) ,
notificationPayloadPublishDate : publishDate ,
}
event := types . Event {
|
The insights Notifications backend expects this field to be
an empty object in the received JSON
|
Metadata : types . EventMetadata { } ,
|
The insights Notifications backend expects to receive the
payload as a string with all its fields as escaped strings
|
Payload : payload ,
}
notification . Events = append ( notification . Events , event )
}
func setupNotificationTypes ( storage Storage ) error {
err := getNotificationTypes ( storage )
if err != nil {
log . Err ( err ) . Msg ( "Read notification types" )
return & StatusStorageError { }
}
return nil
}
func setupNotificationStates ( storage Storage ) error {
err := getStates ( storage )
if err != nil {
log . Err ( err ) . Msg ( "Read states" )
return & StatusStorageError { }
}
return nil
}
|
registerMetrics registers metrics using the provided namespace, if any
|
func registerMetrics ( metricsConfig * conf . MetricsConfiguration ) {
if metricsConfig . Namespace == "" {
return
}
log . Info ( ) . Str ( "namespace" , metricsConfig . Namespace ) . Msg ( "Setting metrics namespace" )
AddMetricsWithNamespaceAndSubsystem (
metricsConfig . Namespace ,
metricsConfig . Subsystem )
}
func closeStorage ( storage Storage ) error {
err := storage . Close ( )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return err
}
return nil
}
func closeNotifier ( notifier producer . Producer ) error {
err := notifier . Close ( )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return err
}
return nil
}
func pushMetrics ( metricsConf * conf . MetricsConfiguration ) error {
if metricsConf . Namespace == "" || metricsConf . GatewayAuthToken == "" {
log . Debug ( ) . Msg ( "No metrics configuration detected. Metrics will not be pushed" )
return nil
}
err := PushCollectedMetrics ( metricsConf )
if err != nil {
log . Err ( err ) . Msg ( metricsPushFailedMessage )
if metricsConf . RetryAfter == 0 || metricsConf . Retries == 0 {
return & StatusMetricsError { }
}
for i := metricsConf . Retries ; i > 0 ; i -- {
time . Sleep ( metricsConf . RetryAfter )
log . Info ( ) . Msgf ( "Push metrics. Retrying (%d/%d attempts left)" , i , metricsConf . Retries )
err = PushCollectedMetrics ( metricsConf )
if err == nil {
log . Info ( ) . Msg ( "Metrics pushed successfully. Terminating notification service successfully." )
return nil
}
log . Err ( err ) . Msg ( metricsPushFailedMessage )
}
return & StatusMetricsError { }
}
log . Info ( ) . Msg ( "Metrics pushed successfully. Terminating notification service successfully." )
return nil
}
func assertNotificationDestination ( config * conf . ConfigStruct ) error {
if ! conf . GetKafkaBrokerConfiguration ( config ) . Enabled && ! conf . GetServiceLogConfiguration ( config ) . Enabled {
log . Error ( ) . Msg ( destinationNotSet )
return & StatusConfiguration { }
}
if conf . GetKafkaBrokerConfiguration ( config ) . Enabled && conf . GetServiceLogConfiguration ( config ) . Enabled {
log . Error ( ) . Msg ( onlyOneDestinationAllowed )
return & StatusConfiguration { }
}
return nil
}
|
RetrievePreviouslyReportedForEventTarget reads previously reported issues
|
func ( d * Differ ) RetrievePreviouslyReportedForEventTarget ( cooldown string , target types . EventTarget , clusters [ ] types . ClusterEntry ) error {
log . Info ( ) . Msg ( "Reading previously reported issues for given cluster list..." )
var err error
d . PreviouslyReported , err = d . Storage . ReadLastNotifiedRecordForClusterList ( clusters , cooldown , target )
if err != nil {
ReadReportedErrors . Inc ( )
log . Err ( err ) . Msg ( operationFailedMessage )
return & StatusStorageError { }
}
log . Info ( ) . Int ( "target" , int ( target ) ) . Int ( "retrieved" , len ( d . PreviouslyReported ) ) . Msg ( "Done reading previously reported issues still in cool down" )
return nil
}
func ( d * Differ ) start ( config * conf . ConfigStruct ) error {
log . Info ( ) . Msg ( "Differ started" )
log . Info ( ) . Msg ( separator )
metricsConfiguration := conf . GetMetricsConfiguration ( config )
registerMetrics ( & metricsConfiguration )
log . Info ( ) . Msg ( separator )
log . Info ( ) . Msg ( "Getting rule content and impacts from content service" )
dependenciesConfiguration := conf . GetDependenciesConfiguration ( config )
ruleContent , err := fetchAllRulesContent ( & dependenciesConfiguration )
if err != nil {
FetchContentErrors . Inc ( )
return & FetchStatusContentError { }
}
log . Info ( ) . Msg ( separator )
log . Info ( ) . Msg ( "Read cluster list" )
notifConfig := conf . GetNotificationsConfiguration ( config )
setupNotificationURLs ( notifConfig )
if err := setupNotificationStates ( d . Storage ) ; err != nil {
return err
}
if err := setupNotificationTypes ( d . Storage ) ; err != nil {
return err
}
go PushMetricsInLoop ( context . Background ( ) , & metricsConfiguration )
clusters , err := d . Storage . ReadClusterList ( )
if err != nil {
ReadClusterListErrors . Inc ( )
log . Err ( err ) . Msg ( operationFailedMessage )
return & StatusStorageError { }
}
|
filter clusters according to allow list and block list
|
clusters , statistic := filterClusterList ( clusters , conf . GetProcessingConfiguration ( config ) )
log . Info ( ) .
Int ( "On input" , statistic . Input ) .
Int ( "Allowed" , statistic . Allowed ) .
Int ( "Blocked" , statistic . Blocked ) .
Int ( "Filtered" , statistic . Filtered ) .
Msg ( "Filter cluster list" )
entries := len ( clusters )
if entries == 0 {
log . Info ( ) . Msg ( "Differ finished" )
return nil
}
log . Info ( ) . Int ( clustersAttribute , entries ) . Msg ( "Read cluster list: done" )
log . Info ( ) . Msg ( separator )
if err := d . RetrievePreviouslyReportedForEventTarget ( d . CoolDown , d . Target , clusters ) ; err != nil {
return err
}
log . Info ( ) . Msg ( separator )
log . Info ( ) . Msg ( "Checking new issues for all new reports" )
d . ProcessClusters ( config , ruleContent , clusters )
log . Info ( ) . Int ( clustersAttribute , entries ) . Msg ( "Process Clusters Entries: done" )
if err := d . close ( ) ; err != nil {
return err
}
log . Info ( ) . Msg ( "Differ finished. Pushing metrics to the configured prometheus gateway." )
if err := pushMetrics ( & metricsConfiguration ) ; err != nil {
return err
}
log . Info ( ) . Msg ( separator )
return nil
}
func setupNotificationURLs ( config conf . NotificationsConfiguration ) {
notificationEventURLs . ClusterDetails = config . ClusterDetailsURI
notificationEventURLs . RuleDetails = config . RuleDetailsURI
notificationEventURLs . InsightsAdvisor = config . InsightsAdvisorURL
}
func errorForTarget ( t types . EventTarget ) error {
if t == types . NotificationBackendTarget {
return & KafkaBrokerError { }
}
if t == types . ServiceLogTarget {
return & ServiceLogError { }
}
return nil
}
func ( d * Differ ) close ( ) ( err error ) {
log . Info ( ) . Msg ( separator )
if err = closeStorage ( d . Storage ) ; err != nil {
err = & StatusStorageError { }
}
log . Info ( ) . Msg ( separator )
errNotifier := closeNotifier ( d . Notifier )
if errNotifier != nil {
return errorForTarget ( d . Target )
}
log . Info ( ) . Msg ( separator )
return
}
|
SetupFiltersAndThresholds function setup both techniques that can be used to
filter messages sent to targets (Notification backend and ServiceLog at this moment):
1. filter based on likelihood, impact, severity, and total risk
2. filter based on rule type that's identified by tags
|
func ( d * Differ ) SetupFiltersAndThresholds ( config * conf . ConfigStruct ) error {
kafkaBrokerConfiguration := conf . GetKafkaBrokerConfiguration ( config )
if kafkaBrokerConfiguration . Enabled {
d . Thresholds = EventThresholds {
TotalRisk : kafkaBrokerConfiguration . TotalRiskThreshold ,
Likelihood : kafkaBrokerConfiguration . LikelihoodThreshold ,
Impact : kafkaBrokerConfiguration . ImpactThreshold ,
Severity : kafkaBrokerConfiguration . SeverityThreshold ,
}
if kafkaBrokerConfiguration . EventFilter == "" {
d . Filter = DefaultEventFilter
} else {
d . Filter = kafkaBrokerConfiguration . EventFilter
}
|
filtering by tags
|
d . FilterByTag = kafkaBrokerConfiguration . TagFilterEnabled
d . TagsSet = kafkaBrokerConfiguration . TagsSet
|
check if tags set is provided via configuration if filtering is enabled
|
if d . FilterByTag && d . TagsSet == nil {
err := & StatusEventFilterError { Msg : configurationProblem }
log . Err ( err ) . Msg ( tagsNotSetMessage )
return err
}
return nil
}
serviceLogConfiguration := conf . GetServiceLogConfiguration ( config )
if serviceLogConfiguration . Enabled {
d . Thresholds = EventThresholds {
TotalRisk : serviceLogConfiguration . TotalRiskThreshold ,
Likelihood : serviceLogConfiguration . LikelihoodThreshold ,
Impact : serviceLogConfiguration . ImpactThreshold ,
Severity : serviceLogConfiguration . SeverityThreshold ,
}
if serviceLogConfiguration . EventFilter == "" {
d . Filter = DefaultEventFilter
} else {
d . Filter = serviceLogConfiguration . EventFilter
}
|
filtering by tags
|
d . FilterByTag = serviceLogConfiguration . TagFilterEnabled
d . TagsSet = serviceLogConfiguration . TagsSet
|
check if tags set is provided via configuration if filtering is enabled
|
if d . FilterByTag && d . TagsSet == nil {
err := & StatusEventFilterError { Msg : configurationProblem }
log . Err ( err ) . Msg ( tagsNotSetMessage )
return err
}
return nil
}
return nil
}
|
Run function is entry point to the differ
|
func Run ( config conf . ConfigStruct , cliFlags types . CliFlags ) int {
defer logger . CloseZerolog ( )
if cliFlags . InstantReports {
notificationType = types . InstantNotif
}
|
prepare the storage
|
storageConfiguration := conf . GetStorageConfiguration ( & config )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
StorageSetupErrors . Inc ( )
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError
}
if deleteOperationSpecified ( cliFlags ) {
err := PerformCleanupOperation ( storage , cliFlags )
if err != nil {
return ExitStatusCleanerError
}
return ExitStatusOK
}
|
perform database cleanup on startup if specified on command line
|
if cliFlags . CleanupOnStartup {
err := PerformCleanupOnStartup ( storage , cliFlags )
if err != nil {
return ExitStatusCleanerError
}
|
if previous operation is correct, just continue
|
}
d , err := New ( & config , storage )
if err != nil {
return selectError ( err )
}
err = d . start ( & config )
return selectError ( err )
}
|
New constructs new implementation of Differ interface
|
func New ( config * conf . ConfigStruct , storage Storage ) ( * Differ , error ) {
if err := assertNotificationDestination ( config ) ; err != nil {
return nil , err
}
d := Differ {
Storage : storage ,
NotificationType : notificationType ,
PreviouslyReported : make ( types . NotifiedRecordsPerCluster ) ,
Thresholds : EventThresholds { } ,
}
if conf . GetKafkaBrokerConfiguration ( config ) . Enabled {
d . Target = types . NotificationBackendTarget
if err := d . SetupKafkaProducer ( config ) ; err != nil {
return nil , err
}
d . CoolDown = conf . GetKafkaBrokerConfiguration ( config ) . Cooldown
} else if conf . GetServiceLogConfiguration ( config ) . Enabled {
d . Target = types . ServiceLogTarget
if err := d . setupServiceLogProducer ( config ) ; err != nil {
return nil , err
}
d . CoolDown = conf . GetServiceLogConfiguration ( config ) . Cooldown
}
err := d . SetupFiltersAndThresholds ( config )
if err != nil {
return nil , err
}
return & d , nil
}
func selectError ( err error ) int {
if err == nil {
return ExitStatusOK
}
switch err . ( type ) {
case * FetchStatusContentError :
return ExitStatusFetchContentError
case * StatusStorageError :
return ExitStatusStorageError
case * KafkaBrokerError :
return ExitStatusKafkaBrokerError
case * ServiceLogError :
return ExitStatusServiceLogError
case * StatusMetricsError :
return ExitStatusMetricsError
case * StatusConfiguration :
return ExitStatusConfiguration
case * StatusEventFilterError :
return ExitStatusEventFilterError
}
return ExitStatusOK
}
|