TestProcessClustersNoReportForClusterEntry tests that when no report is found for
a given cluster entry, the processing is not stopped
|
func TestProcessClustersNoReportForClusterEntry ( t * testing . T ) {
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . MatchedBy ( func ( orgID types . OrgID ) bool { return orgID == 1 } ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return ""
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return sql . ErrNoRows
} ,
)
storage . On ( "ReadReportForClusterAtTime" ,
mock . MatchedBy ( func ( orgID types . OrgID ) bool { return orgID == 2 } ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 1 ,
} ,
Likelihood : 2 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 2 ,
} ,
Likelihood : 1 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . DebugLevel )
zerolog . SetGlobalLevel ( zerolog . DebugLevel )
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , "no rows in result set" , "No report should be retrieved for the first cluster" )
assert . Contains ( t , executionLog , "No new issues to notify for cluster second_cluster" , "the processReportsByCluster loop did not continue as extpected" )
assert . Contains ( t , executionLog , "Number of reports not retrieved/deserialized: 1" , "the first cluster should have been skipped" )
assert . Contains ( t , executionLog , "Number of empty reports skipped: 0" )
zerolog . SetGlobalLevel ( zerolog . WarnLevel )
}
|
TestProcessClustersInvalidReportFormatForClusterEntry tests that when the report found
for a given cluster entry cannot be deserialized, the processing is not stopped
|
func TestProcessClustersInvalidReportFormatForClusterEntry ( t * testing . T ) {
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . MatchedBy ( func ( orgID types . OrgID ) bool { return orgID == 1 } ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"reports\":{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "ReadReportForClusterAtTime" ,
mock . MatchedBy ( func ( orgID types . OrgID ) bool { return orgID == 2 } ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 1 ,
} ,
Likelihood : 2 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 2 ,
} ,
Likelihood : 1 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . DebugLevel )
zerolog . SetGlobalLevel ( zerolog . DebugLevel )
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , "cannot unmarshal object into Go struct field Report.reports of type types.ReportContent" , "The string retrieved is not a list of reports. It should not deserialize correctly" )
assert . Contains ( t , executionLog , "No new issues to notify for cluster second_cluster" , "the processReportsByCluster loop did not continue as expected" )
assert . Contains ( t , executionLog , "Number of reports not retrieved/deserialized: 1" , "the first cluster should have been skipped" )
assert . Contains ( t , executionLog , "Number of empty reports skipped: 0" )
zerolog . SetGlobalLevel ( zerolog . WarnLevel )
}
func TestProcessClustersInstantNotifsAndTotalRiskInferiorToThreshold ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . DebugLevel )
zerolog . SetGlobalLevel ( zerolog . DebugLevel )
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 1 ,
} ,
Likelihood : 2 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 2 ,
} ,
Likelihood : 1 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . AnythingOfType ( "types.OrgID" ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_4|RULE_4\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_4\",\"details\":\"some details\"},{\"rule_id\":\"rule_4|RULE_4\",\"component\":\"ccx_rules_ocp.external.rules.rule_2.report\",\"type\":\"rule\",\"key\":\"RULE_2\",\"details\":\"some details\"},{\"rule_id\":\"rule_5|RULE_5\",\"component\":\"ccx_rules_ocp.external.rules.rule_5.report\",\"type\":\"rule\",\"key\":\"RULE_3\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , "No new issues to notify for cluster first_cluster" , "processClusters shouldn't generate any notification for 'first_cluster' with given data" )
assert . Contains ( t , executionLog , "No new issues to notify for cluster second_cluster" , "processClusters shouldn't generate any notification for 'second_cluster' with given data" )
zerolog . SetGlobalLevel ( zerolog . WarnLevel )
}
func TestProcessClustersInstantNotifsAndTotalRiskImportant ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . InfoLevel )
zerolog . SetGlobalLevel ( zerolog . InfoLevel )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap (
map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
} )
producerMock := mocks . Producer { }
producerMock . On ( "ProduceMessage" , mock . AnythingOfType ( "types.ProducerMessage" ) ) . Return (
func ( msg types . ProducerMessage ) int32 {
testPartitionID ++
return int32 ( testPartitionID )
} ,
func ( msg types . ProducerMessage ) int64 {
testOffset ++
return int64 ( testOffset )
} ,
func ( msg types . ProducerMessage ) error {
return nil
} ,
)
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 3 ,
} ,
Likelihood : 4 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 3 ,
} ,
Likelihood : 3 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . AnythingOfType ( "types.OrgID" ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "ReadLastNNotifiedRecords" , mock . AnythingOfType ( "types.ClusterEntry" ) , mock . AnythingOfType ( "int" ) ) . Return (
func ( clusterEntry types . ClusterEntry , numberOfRecords int ) [ ] types . NotificationRecord {
|
Return a record that is different from the one that will be processed so notification is sent
|
return [ ] types . NotificationRecord {
{
OrgID : 3 ,
AccountNumber : 4 ,
ClusterName : "a cluster" ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
NotificationTypeID : 0 ,
StateID : 0 ,
Report : "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_4|RULE_4\",\"component\":\"ccx_rules_ocp.external.rules.rule_4.report\",\"type\":\"rule\",\"key\":\"RULE_4\",\"details\":\"some details\"},{\"rule_id\":\"rule_4|RULE_4\",\"component\":\"ccx_rules_ocp.external.rules.rule_2.report\",\"type\":\"rule\",\"key\":\"RULE_2\",\"details\":\"some details\"},{\"rule_id\":\"rule_5|RULE_5\",\"component\":\"ccx_rules_ocp.external.rules.rule_5.report\",\"type\":\"rule\",\"key\":\"RULE_3\",\"details\":\"some details\"}]}" ,
NotifiedAt : types . Timestamp ( testTimestamp . Add ( - 2 ) ) ,
ErrorLog : "" ,
} ,
}
} ,
func ( clusterEntry types . ClusterEntry , numberOfRecords int ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
Notifier : & producerMock ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , differ . ReportWithHighImpactMessage , "processClusters should create a notification for 'first_cluster' with given data" )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"first_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'first_cluster' with given data" )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"second_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'first_cluster' with given data" )
zerolog . SetGlobalLevel ( zerolog . WarnLevel )
}
func TestProcessClustersInstantNotifsAndTotalRiskCritical ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . InfoLevel )
zerolog . SetGlobalLevel ( zerolog . InfoLevel )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap (
map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
} )
producerMock := mocks . Producer { }
producerMock . On ( "ProduceMessage" , mock . AnythingOfType ( "types.ProducerMessage" ) ) . Return (
func ( msg types . ProducerMessage ) int32 {
testPartitionID ++
return int32 ( testPartitionID )
} ,
func ( msg types . ProducerMessage ) int64 {
testOffset ++
return int64 ( testOffset )
} ,
func ( msg types . ProducerMessage ) error {
return nil
} ,
)
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 4 ,
} ,
Likelihood : 4 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 4 ,
} ,
Likelihood : 4 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . AnythingOfType ( "types.OrgID" ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
Notifier : & producerMock ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , fmt . Sprintf ( "{\"level\":\"warn\",\"type\":\"rule\",\"rule\":\"rule_1\",\"error key\":\"RULE_1\",\"likelihood\":4,\"impact\":4,\"totalRisk\":4,\"message\":\"%s\"}\n" , differ . ReportWithHighImpactMessage ) )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"first_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'first_cluster' with given data" )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"second_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'first_cluster' with given data" )
}
func TestProcessClustersAllIssuesAlreadyNotifiedCooldownNotPassed ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . DebugLevel )
zerolog . SetGlobalLevel ( zerolog . DebugLevel )
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 3 ,
} ,
Likelihood : 4 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : types . OrgID ( 1 ) ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : types . OrgID ( 2 ) ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . AnythingOfType ( "types.OrgID" ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
}
d . PreviouslyReported = make ( types . NotifiedRecordsPerCluster , 2 )
d . PreviouslyReported [ types . ClusterOrgKey { OrgID : types . OrgID ( 1 ) , ClusterName : "first_cluster" } ] = types . NotificationRecord {
OrgID : 1 ,
AccountNumber : 4 ,
ClusterName : "first_cluster" ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
NotificationTypeID : 0 ,
StateID : 0 ,
Report : "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}" ,
NotifiedAt : types . Timestamp ( testTimestamp . Add ( - 2 ) ) ,
ErrorLog : "" ,
}
d . PreviouslyReported [ types . ClusterOrgKey { OrgID : types . OrgID ( 2 ) , ClusterName : "second_cluster" } ] = types . NotificationRecord {
OrgID : 2 ,
AccountNumber : 4 ,
ClusterName : "second_cluster" ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
NotificationTypeID : 0 ,
StateID : 0 ,
Report : "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}" ,
NotifiedAt : types . Timestamp ( testTimestamp . Add ( - 2 ) ) ,
ErrorLog : "" ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , "{\"level\":\"debug\",\"message\":\"No new issues to notify for cluster first_cluster\"}\n" , "Notification already sent for first_cluster's report, but corresponding log not found." )
assert . Contains ( t , executionLog , "{\"level\":\"debug\",\"message\":\"No new issues to notify for cluster second_cluster\"}\n" , "Notification already sent for second_cluster's report, but corresponding log not found." )
}
func TestProcessClustersNewIssuesNotPreviouslyNotified ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . InfoLevel )
zerolog . SetGlobalLevel ( zerolog . InfoLevel )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap (
map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
} )
errorKeys := map [ string ] utypes . RuleErrorKeyContent {
"RULE_1" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 1 error key description" ,
Impact : utypes . Impact {
Name : "impact_1" ,
Impact : 4 ,
} ,
Likelihood : 4 ,
} ,
Reason : "rule 1 reason" ,
HasReason : true ,
} ,
"RULE_2" : {
Metadata : utypes . ErrorKeyMetadata {
Description : "rule 2 error key description" ,
Impact : utypes . Impact {
Name : "impact_2" ,
Impact : 4 ,
} ,
Likelihood : 4 ,
} ,
HasReason : false ,
} ,
}
ruleContent := types . RulesMap {
"rule_1" : {
Summary : "rule 1 summary" ,
Reason : "rule 1 reason" ,
Resolution : "rule 1 resolution" ,
MoreInfo : "rule 1 more info" ,
ErrorKeys : errorKeys ,
HasReason : true ,
} ,
"rule_2" : {
Summary : "rule 2 summary" ,
Reason : "" ,
Resolution : "rule 2 resolution" ,
MoreInfo : "rule 2 more info" ,
ErrorKeys : errorKeys ,
HasReason : false ,
} ,
}
clusters := [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first_cluster" ,
KafkaOffset : 0 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second_cluster" ,
KafkaOffset : 100 ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
} ,
}
storage := mocks . Storage { }
storage . On ( "ReadReportForClusterAtTime" ,
mock . AnythingOfType ( "types.OrgID" ) ,
mock . AnythingOfType ( "types.ClusterName" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ) . Return (
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) types . ClusterReport {
return "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_1\",\"details\":\"some details\"}]}"
} ,
func ( orgID types . OrgID , clusterName types . ClusterName , updatedAt types . Timestamp ) error {
return nil
} ,
)
storage . On ( "WriteNotificationRecordForCluster" ,
mock . AnythingOfType ( "types.ClusterEntry" ) ,
mock . AnythingOfType ( "types.NotificationTypeID" ) ,
mock . AnythingOfType ( "types.StateID" ) ,
mock . AnythingOfType ( "types.ClusterReport" ) ,
mock . AnythingOfType ( "types.Timestamp" ) ,
mock . AnythingOfType ( "string" ) ,
mock . AnythingOfType ( "types.EventTarget" ) ) . Return (
func ( clusterEntry types . ClusterEntry , notificationTypeID types . NotificationTypeID , stateID types . StateID , report types . ClusterReport , notifiedAt types . Timestamp , errorLog string , eventTarget types . EventTarget ) error {
return nil
} ,
)
producerMock := mocks . Producer { }
producerMock . On ( "ProduceMessage" , mock . AnythingOfType ( "types.ProducerMessage" ) ) . Return (
func ( msg types . ProducerMessage ) int32 {
testPartitionID ++
return int32 ( testPartitionID )
} ,
func ( msg types . ProducerMessage ) int64 {
testOffset ++
return int64 ( testOffset )
} ,
func ( msg types . ProducerMessage ) error {
return nil
} ,
)
d := differ . Differ {
Storage : & storage ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
}
d . Notifier = & producerMock
d . PreviouslyReported = make ( types . NotifiedRecordsPerCluster , 1 )
d . PreviouslyReported [ types . ClusterOrgKey { OrgID : types . OrgID ( 3 ) , ClusterName : "a cluster" } ] = types . NotificationRecord {
OrgID : 3 ,
AccountNumber : 4 ,
ClusterName : "a cluster" ,
UpdatedAt : types . Timestamp ( testTimestamp ) ,
NotificationTypeID : 0 ,
StateID : 1 ,
Report : "{\"analysis_metadata\":{\"metadata\":\"some metadata\"},\"reports\":[{\"rule_id\":\"rule_1|RULE_1\",\"component\":\"ccx_rules_ocp.external.rules.rule_1.report\",\"type\":\"rule\",\"key\":\"RULE_4\",\"details\":\"some details\"}]}" ,
NotifiedAt : types . Timestamp ( testTimestamp . Add ( - 2 ) ) ,
ErrorLog : "" ,
}
d . ProcessClusters ( & conf . ConfigStruct { Kafka : conf . KafkaConfiguration { Enabled : true } } , ruleContent , clusters )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , fmt . Sprintf ( "{\"level\":\"warn\",\"type\":\"rule\",\"rule\":\"rule_1\",\"error key\":\"RULE_1\",\"likelihood\":4,\"impact\":4,\"totalRisk\":4,\"message\":\"%s\"}\n" , differ . ReportWithHighImpactMessage ) )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"first_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'first_cluster' with given data" )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"cluster\":\"second_cluster\",\"number of events\":1,\"message\":\"Producing instant notification\"}" , "processClusters should generate one notification for 'second_cluster' with given data" )
}
func TestRetrievePreviouslyReportedForEventTarget ( t * testing . T ) {
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . InfoLevel )
zerolog . SetGlobalLevel ( zerolog . InfoLevel )
var (
now = time . Now ( )
clusters = "'first cluster','second cluster'"
orgs = "'1','2'"
clusterEntries = [ ] types . ClusterEntry {
{
OrgID : 1 ,
AccountNumber : 1 ,
ClusterName : "first cluster" ,
KafkaOffset : 1 ,
UpdatedAt : types . Timestamp ( now ) ,
} ,
{
OrgID : 2 ,
AccountNumber : 2 ,
ClusterName : "second cluster" ,
KafkaOffset : 1 ,
UpdatedAt : types . Timestamp ( now ) ,
} ,
}
timeOffset = "1 day"
)
producerMock := mocks . Producer { }
producerMock . On ( "ProduceMessage" , mock . AnythingOfType ( "types.ProducerMessage" ) ) . Return (
func ( msg types . ProducerMessage ) int32 {
testPartitionID ++
return int32 ( testPartitionID )
} ,
func ( msg types . ProducerMessage ) int64 {
testOffset ++
return int64 ( testOffset )
} ,
func ( msg types . ProducerMessage ) error {
return nil
} ,
)
db , mock := newMock ( t )
defer func ( ) { _ = db . Close ( ) } ( )
sut := differ . NewFromConnection ( db , types . DBDriverPostgres )
d := differ . Differ {
Storage : sut ,
NotificationType : types . InstantNotif ,
Target : types . NotificationBackendTarget ,
Thresholds : differ . EventThresholds {
TotalRisk : differ . DefaultTotalRiskThreshold ,
} ,
Filter : differ . DefaultEventFilter ,
Notifier : & producerMock ,
}
expectedQuery := fmt . Sprintf ( `
SELECT org_id, cluster, report, notified_at
FROM (
SELECT DISTINCT ON (cluster) *
FROM reported
WHERE event_type_id = %v AND state = 1 AND org_id IN (%v) AND cluster IN (%v)
ORDER BY cluster, notified_at DESC) t
WHERE notified_at > NOW() - $1::INTERVAL ;
` , types . NotificationBackendTarget , orgs , clusters )
rows := sqlmock . NewRows (
[ ] string { "org_id" , "cluster" , "report" , "notified_at" } ) .
AddRow ( 1 , "first cluster" , "test" , now ) .
AddRow ( 1 , "second cluster" , "test" , now )
mock . ExpectQuery ( regexp . QuoteMeta ( expectedQuery ) ) .
WithArgs ( timeOffset ) .
WillReturnRows ( rows )
err := d . RetrievePreviouslyReportedForEventTarget ( timeOffset , types . NotificationBackendTarget , clusterEntries )
assert . Nil ( t , err )
executionLog := buf . String ( )
assert . Contains ( t , executionLog , "{\"level\":\"info\",\"message\":\"Reading previously reported issues for given cluster list...\"}\n{\"level\":\"info\",\"target\":1,\"retrieved\":2,\"message\":\"Done reading previously reported issues still in cool down\"}\n" )
}
func TestRetrievePreviouslyReportedForEventTargetEmptyClusterEntries ( t * testing . T ) {
clusterEntries := [ ] types . ClusterEntry { }
timeOffset := "1 day"
buf := new ( bytes . Buffer )
log . Logger = zerolog . New ( buf ) . Level ( zerolog . InfoLevel )
zerolog . SetGlobalLevel ( zerolog . InfoLevel )
producerMock := mocks . Producer { }
producerMock . On ( "ProduceMessage" , mock . AnythingOfType ( "types.ProducerMessage" ) ) . Return (
func ( msg types . ProducerMessage ) int32 {
testPartitionID ++
return int32 ( testPartitionID )
} ,
func ( msg types . ProducerMessage ) int64 {
testOffset ++
return int64 ( testOffset )
} ,
func ( msg types . ProducerMessage ) error {
return nil
} ,
)
|