package main_test
Unit test definitions for functions and methods defined in source file
Documentation in literate-programming-style is available at:
import (
main "github.com/RedHatInsights/ccx-notification-writer"
types "github.com/RedHatInsights/insights-results-types"
Constants used by unit tests
const (
ConsumerReport = `{
"fingerprints": [],
"info": [],
"reports": [],
"skips": [],
"system": {}
Variables used by unit tests
var (
ExpectedOrgID = types . OrgID ( 1 )
ExpectedAccountNumber = types . AccountNumber ( 1234 )
ExpectedClusterName = types . ClusterName ( "84f7eedc-0dd8-49cd-9d4d-f6646df3a5bc" )
LastCheckedAt = time . Unix ( 25 , 0 ) . UTC ( )
TestNewConsumerBadBroker function checks the consumer creation by
using a non-accessible Kafka broker.
func TestNewConsumerBadBrokerNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to: dial tcp: missing address"
invalid broker configuration
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "" ,
Topic : "whatever" ,
Group : "whatever" ,
Enabled : true ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestNewConsumerLocalBroker function checks the consumer creation by using a
non-accessible Kafka broker. This test assumes there is no local Kafka
instance currently running
func TestNewConsumerLocalBrokerNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestNewConsumerConsumerGroup function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. Consumer group is enabled and setup for this test.
func TestNewConsumerSaramaConfigNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "test-group" ,
Enabled : true ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestNewConsumerTLSEnabled function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. TSL is enabled in broker configuration.
func TestNewConsumerTLSEnabledNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SSL" ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestNewConsumerSASLEnabled function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. SASL is enabled in broker configuration.
func TestNewConsumerSASLEnabledNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : "" ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestSaramaConfigFromBrokerWithSASLEnabled function checks that the Sarama config returned
for a broker configuration with SASL enabled contains the expected fields
func TestSaramaConfigFromBrokerWithSASLEnabledNoSASLMechanism ( t * testing . T ) {
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : "" ,
saramaConfig , err := main . SaramaConfigFromBrokerConfig ( & brokerConfiguration )
assert . Nil ( t , err )
assert . True ( t , saramaConfig . Net . SASL . Enable )
assert . Equal ( t , saramaConfig . Net . SASL . User , brokerConfiguration . SaslUsername )
assert . Equal ( t , saramaConfig . Net . SASL . Password , brokerConfiguration . SaslPassword )
assert . Nil ( t , saramaConfig . Net . SASL . SCRAMClientGeneratorFunc , "SCRAM client generator function should not be created with given config" )
TestSaramaConfigFromBrokerWithSASLEnabled function checks that the Sarama config returned
for a broker configuration with SASL enabled using SCRAM authentication mechanism
contains expected fields
func TestSaramaConfigFromBrokerWithSASLEnabledSCRAMAuth ( t * testing . T ) {
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : sarama . SASLTypeSCRAMSHA512 ,
saramaConfig , err := main . SaramaConfigFromBrokerConfig ( & brokerConfiguration )
assert . Nil ( t , err )
assert . True ( t , saramaConfig . Net . SASL . Enable )
assert . Equal ( t , saramaConfig . Net . SASL . User , brokerConfiguration . SaslUsername )
assert . Equal ( t , saramaConfig . Net . SASL . Password , brokerConfiguration . SaslPassword )
assert . NotNil ( t , saramaConfig . Net . SASL . SCRAMClientGeneratorFunc , "SCRAM client generator function should have been created with given config" )
TestSaramaConfigFromBrokerWithSASLEnabled function checks that the Sarama config returned
for a broker configuration with SASL enabled using unhandled authentication mechanism
contains expected fields
func TestSaramaConfigFromBrokerWithSASLEnabledUnexpectedAuthMechanism ( t * testing . T ) {
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : sarama . SASLTypeSCRAMSHA256 ,
saramaConfig , err := main . SaramaConfigFromBrokerConfig ( & brokerConfiguration )
assert . Nil ( t , err )
assert . True ( t , saramaConfig . Net . SASL . Enable )
assert . Equal ( t , saramaConfig . Net . SASL . User , brokerConfiguration . SaslUsername )
assert . Equal ( t , saramaConfig . Net . SASL . Password , brokerConfiguration . SaslPassword )
assert . Nil ( t , saramaConfig . Net . SASL . SCRAMClientGeneratorFunc , "SCRAM client generator function should not be created with given config" )
TestNewConsumerSASLEnabled function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. SASL is enabled in broker configuration.
func TestNewConsumerSASLEnabled ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : "" ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"consumer.New should return nil instead of Consumer implementation" ,
TestNewConsumerCertPath function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. Valid cert. path is provided by tests.
func TestNewConsumerCertPathNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "kafka: client has run out of available brokers to talk to"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
CertPath : "testdata/cert.pem" ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"invalid cert path" ,
TestNewConsumerInvalidCertPath function checks the consumer creation by using a
non-accesible Kafka broker. This test assumes there is no local Kafka
instance currently running. Invalid cert. path is provided by tests.
func TestNewConsumerInvalidCertPathNonAccessibleBroker ( t * testing . T ) {
expected error message
const expectedErr = "open /foo/bar/baz: no such file or directory"
valid broker configuration for local Kafka instance
var brokerConfiguration = main . BrokerConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Group : "" ,
Enabled : true ,
SecurityProtocol : "SSL" ,
CertPath : "/foo/bar/baz" ,
dummy storage not really usable as the driver is not specified
dummyStorage := main . NewFromConnection ( nil , 1 )
try to construct new consumer
mockConsumer , err := main . NewConsumer ( & brokerConfiguration , dummyStorage )
check that error is really reported
assert . Contains ( t , err . Error ( ) , expectedErr )
test the return value
assert . Equal (
t ,
( * main . KafkaConsumer ) ( nil ) ,
mockConsumer ,
"invalid cert path" ,
TestParseEmptyMessage checks how empty message is handled by
func TestParseEmptyMessage ( t * testing . T ) {
empty message to be parsed
const emptyMessage = ""
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( emptyMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "unexpected end of JSON input" )
TestParseMessageWithWrongContent checks how message with wrong
(unexpected) content is handled by consumer.
func TestParseMessageWithWrongContent ( t * testing . T ) {
JSON-encoded message with unexpected content
const message = `{"this":"is", "not":"expected content"}`
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( message ) )
check for error - it should be reported
assert . Error ( t , err )
assert . Contains ( t , err . Error ( ) , "missing required attribute" )
TestParseMessageWithImproperJSON checks how message with wrong
content which is not parseable as JSON is processed.
func TestParseMessageWithImproperJSON ( t * testing . T ) {
message not in JSON format
const message = `"this_is_not_json_dude"`
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( message ) )
check for error - it should be reported
assert . EqualError (
t ,
err ,
"json: cannot unmarshal string into Go value of type main.IncomingMessage" ,
TestParseProperMessage checks the parsing of properly declared
func TestParseProperMessage ( t * testing . T ) {
message in JSON format with all required attributes
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
message , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should not be reported
assert . Nil ( t , err )
check returned values
assert . Equal ( t , types . OrgID ( 1 ) , * message . Organization )
assert . Equal ( t , ExpectedClusterName , * message . ClusterName )
assert . Equal ( t , ExpectedAccountNumber , * message . AccountNumber )
TestParseProperMessageWrongOrgID checks the parsing of message
with wrong organization ID.
func TestParseProperMessageWrongOrgID ( t * testing . T ) {
message with wrong organization ID attribute.
message := `{
"OrgID": "foobar",
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( message ) )
check for error - it should be reported
assert . EqualError (
t ,
err ,
"json: cannot unmarshal string into Go struct field IncomingMessage.OrgID of type types.OrgID" ,
TestParseProperMessageWrongAccountNumber checks the parsing of message
with wrong organization ID.
func TestParseProperMessageWrongAccountNumber ( t * testing . T ) {
message with wrong account number attribute.
message := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": "foobar",
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( message ) )
check for error - it should be reported
assert . EqualError (
t ,
err ,
"json: cannot unmarshal string into Go struct field IncomingMessage.AccountNumber of type types.AccountNumber" ,
TestParseProperMessageWrongClusterName checks the parsing of message
with wrong cluster name.
func TestParseProperMessageWrongClusterName ( t * testing . T ) {
message with wrong cluster name
message := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "this is not an UUID",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( message ) )
check for error - it should be reported
assert . EqualError ( t , err , "cluster name is not a UUID" )
TestParseMessageWithoutOrgID checks the parsing of improperly
declared message - OrgID attribute is missing.
func TestParseMessageWithoutOrgID ( t * testing . T ) {
message without OrgID attribute
ConsumerMessage := `{
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "missing required attribute 'OrgID'" )
TestParseMessageWithoutAccountNumber checks the parsing of improperly
declared message - AccountNumber attribute is missing.
func TestParseMessageWithoutAccountNumber ( t * testing . T ) {
message without AccountNumber attribute
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "missing required attribute 'AccountNumber'" )
TestParseMessageWithoutClusterName checks the parsing of improperly
declared message - ClusterName attribute is missing.
func TestParseMessageWithoutClusterName ( t * testing . T ) {
message without ClusterName attribute
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "missing required attribute 'ClusterName'" )
TestParseMessageWithoutReport checks the parsing of improperly
declared message - Report attribute is missing.
func TestParseMessageWithoutReport ( t * testing . T ) {
message without Report attribute
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "missing required attribute 'Report'" )
TestParseMessageEmptyReport checks the parsing of improperly
declared message - report attribute is empty.
func TestParseMessageEmptyReport ( t * testing . T ) {
message with empty Report attribute
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report": {},
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "Improper report structure, missing key fingerprints" )
TestParseMessageNullReport checks the parsing of improperly
declared message - Report attribute is null.
func TestParseMessageNullReport ( t * testing . T ) {
message with empty Report attribute
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report": null,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
try to parse the message
_ , err := main . ParseMessage ( [ ] byte ( ConsumerMessage ) )
check for error - it should be reported
assert . EqualError ( t , err , "missing required attribute 'Report'" )
NewDummyConsumer function constructs new instance of (not running)
func NewDummyConsumer ( s main . Storage ) * main . KafkaConsumer {
brokerCfg := main . BrokerConfiguration {
Addresses : "localhost:1234" ,
Topic : "topic" ,
Group : "group" ,
return & main . KafkaConsumer {
Configuration : brokerCfg ,
Storage : s ,
Ready : make ( chan bool ) ,
TestProcessEmptyMessage check the behaviour of function ProcessMessage with
empty message on input.
func TestProcessEmptyMessage ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
prepare an empty message
message := sarama . ConsumerMessage { }
try to process the message
_ , err := dummyConsumer . ProcessMessage ( & message )
check for errors - it should be reported
assert . EqualError ( t , err , "unexpected end of JSON input" )
nothing should be written into storage
assert . Equal ( t , 0 , mockStorage . writeReportCalled )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
TestProcessMessageWithWrongDateFormat check the behaviour of function
ProcessMessage with message with wrong date.
func TestProcessMessageWithWrongDateFormat ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
prepare a message
message := sarama . ConsumerMessage { }
fill in a message payload
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "2020.01.23 16:15:59"
message . Value = [ ] byte ( ConsumerMessage )
try to process the message
_ , err := dummyConsumer . ProcessMessage ( & message )
check for errors - it should be reported
assert . EqualError ( t , err , "parsing time \"2020.01.23 16:15:59\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \".01.23 16:15:59\" as \"-\"" )
nothing should be written into storage
assert . Equal ( t , 0 , mockStorage . writeReportCalled )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
TestProcessMessageFromFuture check the behaviour of function ProcessMessage
with message with wrong date.
func TestProcessMessageFromFuture ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
prepare a message
message := sarama . ConsumerMessage { }
fill in a message payload
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "2099-01-01T23:59:59.999999999Z"
message . Value = [ ] byte ( ConsumerMessage )
try to process the message
_ , err := dummyConsumer . ProcessMessage ( & message )
check for errors - it should be reported
assert . EqualError ( t , err , "got a message from the future" )
nothing should be written into storage
assert . Equal ( t , 0 , mockStorage . writeReportCalled )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
TestProcessCorrectMessage check the behaviour of function ProcessMessage for
correct message.
func TestProcessCorrectMessage ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
prepare a message
message := sarama . ConsumerMessage { }
fill in a message payload
ConsumerMessage := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
message . Value = [ ] byte ( ConsumerMessage )
message is correct -> one record should be written into the database
_ , err := dummyConsumer . ProcessMessage ( & message )
check for error - it should not be reported
assert . Nil ( t , err )
one record should be written into the storage
assert . Equal ( t , 1 , mockStorage . writeReportCalled )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
TestConsumerSetup function checks the method KafkaConsumer.Setup().
func TestConsumerSetup ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
channel that needs to be closed in Setup
dummyConsumer . Ready = make ( chan bool )
try to setup the consumer (without consumer group)
err := dummyConsumer . Setup ( nil )
and check for any error
assert . Nil ( t , err )
TestConsumerCleanup function checks the method KafkaConsumer.Cleanup().
func TestConsumerCleanup ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
try to cleanup the consumer (without consumer group)
err := dummyConsumer . Cleanup ( nil )
and check for any error
assert . Nil ( t , err )
TestConsumerClose function checks the method KafkaConsumer.Close().
func TestConsumerClose ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
try to close the consumer (without consumer group)
err := dummyConsumer . Close ( )
and check for any error
assert . Nil ( t , err )
TestConsumerCloseCancel function checks the method KafkaConsumer.Close().
func TestConsumerCloseCancel ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
setup cancel hook
_ , cancel := context . WithCancel ( context . Background ( ) )
dummyConsumer . Cancel = cancel
try to close the consumer (without consumer group)
err := dummyConsumer . Close ( )
and check for any error
assert . Nil ( t , err )
TestHandleNilMessage function checks the method
KafkaConsumer.HandleMessage() for nil input.
func TestHandleNilMessage ( _ * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
nil message
dummyConsumer . HandleMessage ( nil )
TestHandleEmptyMessage function checks the method
KafkaConsumer.HandleMessage() for empty message value.
func TestHandleEmptyMessage ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
message := sarama . ConsumerMessage { }
message . Value = [ ] byte ( "" )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
empty message
dummyConsumer . HandleMessage ( & message )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 1 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
TestHandleCorrectMessage function checks the method
KafkaConsumer.HandleMessage() for correct input.
func TestHandleCorrectMessage ( t * testing . T ) {
construct mock storage
mockStorage := NewMockStorage ( )
construct dummy consumer
dummyConsumer := NewDummyConsumer ( & mockStorage )
message := sarama . ConsumerMessage { }
value := `{
"OrgID": ` + fmt . Sprint ( ExpectedOrgID ) + `,
"AccountNumber": ` + fmt . Sprint ( ExpectedAccountNumber ) + `,
"ClusterName": "` + string ( ExpectedClusterName ) + `",
"Report":` + ConsumerReport + `,
"LastChecked": "` + LastCheckedAt . UTC ( ) . Format ( time . RFC3339 ) + `"
message . Value = [ ] byte ( value )
counter checks
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
correct message
dummyConsumer . HandleMessage ( & message )
counter checks
assert . Equal ( t , uint64 ( 1 ) , dummyConsumer . GetNumberOfSuccessfullyConsumedMessages ( ) )
assert . Equal ( t , uint64 ( 0 ) , dummyConsumer . GetNumberOfErrorsConsumingMessages ( ) )
func testShrinkedMessage ( t * testing . T , inputMessage main . Report ) {
assert . NotContains ( t , inputMessage , "system" )
assert . NotContains ( t , inputMessage , "fingerprints" )
assert . NotContains ( t , inputMessage , "skips" )
assert . NotContains ( t , inputMessage , "info" )
assert . NotContains ( t , inputMessage , "pass" )
assert . NotContains ( t , inputMessage , "analysis_metadata" )
TestShrinkEmptyMessage tests the function to remove unneeded attributes from incoming message
func TestShrinkEmptyMessage ( t * testing . T ) {
var inputMessage main . Report = make ( map [ string ] * json . RawMessage )
main . ShrinkMessage ( & inputMessage )
testShrinkedMessage ( t , inputMessage )
TestShrinkMessageWithAnalysisMetadata tests the function to remove unneeded attributes from incoming message
func TestShrinkMessageWithAnalysisMetadata ( t * testing . T ) {
var inputMessage main . Report = make ( map [ string ] * json . RawMessage )
analysisMetadata := json . RawMessage ( "{}" )
inputMessage [ "analysis_metadata" ] = & analysisMetadata
main . ShrinkMessage ( & inputMessage )
testShrinkedMessage ( t , inputMessage )