message to be checked
|
organizationIDNotInAllowList = "organization ID is not in allow list"
testReport = `{"fingerprints": [], "info": [], "skips": [], "system": {}, "analysis_metadata":{"metadata":"some metadata"},"reports":[{"rule_id":"rule_4|RULE_4","component":"ccx_rules_ocp.external.rules.rule_1.report","type":"rule","key":"RULE_4","details":"some details"},{"rule_id":"rule_4|RULE_4","component":"ccx_rules_ocp.external.rules.rule_2.report","type":"rule","key":"RULE_2","details":"some details"},{"rule_id":"rule_5|RULE_5","component":"ccx_rules_ocp.external.rules.rule_5.report","type":"rule","key":"RULE_3","details":"some details"}]}`
testMetrics = `{"system":{"metadata":{},"hostname":null},"fingerprints":[],"version":1,"analysis_metadata":{},"workload_recommendations":[{"response_id":"an_issue|DVO_AN_ISSUE","component":"ccx_rules_ocp.external.dvo.an_issue_pod.recommendation","key":"DVO_AN_ISSUE","details":{"check_name":"","check_url":"","samples":[{"namespace_uid":"NAMESPACE-UID-A","kind":"DaemonSet","uid":"193a2099-1234-5678-916a-d570c9aac158"}]},"tags":[],"links":{"jira":["https://issues.redhat.com/browse/AN_ISSUE"],"product_documentation":[]},"workloads":[{"namespace":"namespace-name-A","namespace_uid":"NAMESPACE-UID-A","kind":"DaemonSet","name":"test-name-0099","uid":"193a2099-1234-5678-916a-d570c9aac158"}]}]}`
)
var (
testOrgAllowlist = mapset . NewSetWith ( types . OrgID ( 1 ) )
wrongBrokerCfg = broker . Configuration {
Addresses : "localhost:1234" ,
Topic : "topic" ,
Group : "group" ,
}
messageReportWithRuleHits = `{
"OrgID": ` + fmt . Sprint ( testdata . OrgID ) + `,
"ClusterName": "` + string ( testdata . ClusterName ) + `",
"Report":` + testReport + `,
"LastChecked": "` + testdata . LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
}`
messageReportWithDVOHits = `{
"OrgID": ` + fmt . Sprint ( testdata . OrgID ) + `,
"ClusterName": "` + string ( testdata . ClusterName ) + `",
"Metrics":` + testMetrics + `,
"LastChecked": "` + testdata . LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
}`
messageNoReportsNoInfo = `{
"OrgID": ` + fmt . Sprint ( testdata . OrgID ) + `,
"ClusterName": "` + string ( testdata . ClusterName ) + `",
"LastChecked": "` + testdata . LastCheckedAt . Format ( time . RFC3339 ) + `",
"Report": {
"system": {
"metadata": {},
"hostname": null
},
"fingerprints": [],
"skips": []
}
}`
messageReportNoDVOMetrics = `{
"OrgID": ` + fmt . Sprint ( testdata . OrgID ) + `,
"ClusterName": "` + string ( testdata . ClusterName ) + `",
"Metrics": {},
"LastChecked": "` + testdata . LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
}`
)
func init ( ) {
zerolog . SetGlobalLevel ( zerolog . WarnLevel )
}
func consumerProcessMessage ( mockConsumer consumer . Consumer , message string ) error {
saramaMessage := sarama . ConsumerMessage { }
saramaMessage . Value = [ ] byte ( message )
return mockConsumer . HandleMessage ( & saramaMessage )
}
func mustConsumerProcessMessage ( t testing . TB , mockConsumer consumer . Consumer , message string ) {
helpers . FailOnError ( t , consumerProcessMessage ( mockConsumer , message ) )
}
func createConsumerMessage ( report string ) string {
consumerMessage := `{
"OrgID": ` + fmt . Sprint ( testdata . OrgID ) + `,
"ClusterName": "` + fmt . Sprint ( testdata . ClusterName ) + `",
"LastChecked": "` + testdata . LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `",
"Report": ` + report + `
}
`
return consumerMessage
}
func unmarshall ( s string ) * json . RawMessage {
var res json . RawMessage
err := json . Unmarshal ( [ ] byte ( s ) , & res )
if err != nil {
return nil
}
return & res
}
func TestConsumerConstructorNoKafka ( t * testing . T ) {
mockStorage , closer := ira_helpers . MustGetPostgresStorage ( t , false )
defer closer ( )
mockConsumer , err := consumer . NewKafkaConsumer ( wrongBrokerCfg , mockStorage , nil )
assert . Error ( t , err )
assert . Contains (
t , err . Error ( ) , "kafka: client has run out of available brokers to talk to" ,
)
assert . Equal (
t ,
( * consumer . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
)
}
func TestKafkaConsumer_New ( t * testing . T ) {
helpers . RunTestWithTimeout ( t , func ( t testing . TB ) {
sarama . Logger = log . New ( os . Stdout , saramaLogPrefix , log . LstdFlags )
mockStorage , closer := ira_helpers . MustGetPostgresStorage ( t , true )
defer closer ( )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap ( ira_helpers . GetHandlersMapForMockConsumer ( t , mockBroker , testTopicName ) )
mockConsumer , err := consumer . NewKafkaConsumer ( broker . Configuration {
Addresses : mockBroker . Addr ( ) ,
Topic : testTopicName ,
Enabled : true ,
} , mockStorage , nil )
helpers . FailOnError ( t , err )
err = mockConsumer . Close ( )
helpers . FailOnError ( t , err )
} , testCaseTimeLimit )
}
func TestKafkaConsumer_SetupCleanup ( t * testing . T ) {
mockStorage , closer := ira_helpers . MustGetPostgresStorage ( t , false )
defer closer ( )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap ( ira_helpers . GetHandlersMapForMockConsumer ( t , mockBroker , testTopicName ) )
mockConsumer , err := consumer . NewKafkaConsumer ( broker . Configuration {
Addresses : mockBroker . Addr ( ) ,
Topic : testTopicName ,
Enabled : true ,
} , mockStorage , nil )
helpers . FailOnError ( t , err )
defer func ( ) {
helpers . FailOnError ( t , mockConsumer . Close ( ) )
} ( )
|
Override functions for testing
|
producer . NewDeadLetterProducer = func ( _ broker . Configuration ) ( * producer . DeadLetterProducer , error ) {
return nil , errors . New ( "error happened" )
}
producer . NewPayloadTrackerProducer = func ( _ broker . Configuration ) ( * producer . PayloadTrackerProducer , error ) {
return nil , nil
}
mockStorage , closer := ira_helpers . MustGetPostgresStorage ( t , true )
defer closer ( )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap ( ira_helpers . GetHandlersMapForMockConsumer ( t , mockBroker , testTopicName ) )
_ , err := consumer . NewKafkaConsumer ( broker . Configuration {
Addresses : mockBroker . Addr ( ) ,
Topic : testTopicName ,
Enabled : true ,
} , mockStorage , nil )
assert . EqualError ( t , err , "error happened" )
}
func TestKafkaConsumer_NewPayloadTrackerProducer_Error ( t * testing . T ) {
|
Override functions for testing
|
producer . NewDeadLetterProducer = func ( _ broker . Configuration ) ( * producer . DeadLetterProducer , error ) {
return nil , nil
}
producer . NewPayloadTrackerProducer = func ( _ broker . Configuration ) ( * producer . PayloadTrackerProducer , error ) {
return nil , errors . New ( "error happened" )
}
mockStorage , closer := ira_helpers . MustGetPostgresStorage ( t , true )
defer closer ( )
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
mockBroker . SetHandlerByMap ( ira_helpers . GetHandlersMapForMockConsumer ( t , mockBroker , testTopicName ) )
_ , err := consumer . NewKafkaConsumer ( broker . Configuration {
Addresses : mockBroker . Addr ( ) ,
Topic : testTopicName ,
Enabled : true ,
} , mockStorage , nil )
assert . EqualError ( t , err , "error happened" )
}
|