TestProducerClose makes sure it's possible to close the connection
|
func TestProducerClose ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
prod := Producer {
Configuration : brokerCfg ,
Producer : mockProducer ,
}
err := prod . Close ( )
assert . NoError ( t , err , "failed to close Kafka producer" )
}
func TestProducerNew ( t * testing . T ) {
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
handlerMap := map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
}
mockBroker . SetHandlerByMap ( handlerMap )
prod , err := New ( & conf . ConfigStruct {
Kafka : conf . KafkaConfiguration {
Addresses : mockBroker . Addr ( ) ,
Topic : brokerCfg . Topic ,
Timeout : brokerCfg . Timeout ,
} } )
helpers . FailOnError ( t , err )
helpers . FailOnError ( t , prod . Close ( ) )
}
|
valid broker configuration for local Kafka instance
|
var brokerConfiguration = conf . KafkaConfiguration {
Addresses : "localhost:9092" ,
Topic : "platform.notifications.ingress" ,
Enabled : true ,
SecurityProtocol : "SASL_" ,
SaslUsername : "sasl_user" ,
SaslPassword : "sasl_password" ,
SaslMechanism : sarama . SASLTypeSCRAMSHA256 ,
}
saramaConfig , err := SaramaConfigFromBrokerConfig ( & brokerConfiguration )
assert . Nil ( t , err )
assert . True ( t , saramaConfig . Net . SASL . Enable )
assert . Equal ( t , saramaConfig . Net . SASL . User , brokerConfiguration . SaslUsername )
assert . Equal ( t , saramaConfig . Net . SASL . Password , brokerConfiguration . SaslPassword )
assert . Nil ( t , saramaConfig . Net . SASL . SCRAMClientGeneratorFunc , "SCRAM client generator function should not be created with given config" )
}
func TestProducerSendEmptyNotificationMessage ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
kafkaProducer := Producer {
Configuration : brokerCfg ,
Producer : mockProducer ,
}
msgBytes , err := json . Marshal ( types . NotificationMessage { } )
helpers . FailOnError ( t , err )
_ , _ , err = kafkaProducer . ProduceMessage ( msgBytes )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
func TestProducerSendNotificationMessageNoEvents ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
kafkaProducer := Producer {
Configuration : brokerCfg ,
Producer : mockProducer ,
}
msg := types . NotificationMessage {
Bundle : "openshift" ,
Application : "advisor" ,
EventType : "critical" ,
Timestamp : time . Now ( ) . UTC ( ) . Format ( time . RFC3339Nano ) ,
AccountID : "000000" ,
Events : nil ,
Context : nil ,
}
msgBytes , err := json . Marshal ( msg )
helpers . FailOnError ( t , err )
_ , _ , err = kafkaProducer . ProduceMessage ( msgBytes )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
func TestProducerSendNotificationMessageSingleEvent ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
kafkaProducer := Producer {
Configuration : brokerCfg ,
Producer : mockProducer ,
}
events := [ ] types . Event {
{
Metadata : types . EventMetadata { } ,
Payload : types . EventPayload { "rule_id" : "a unique ID" , "what happened" : "something baaad happened" , "error_code" : "3" } ,
} ,
}
msg := types . NotificationMessage {
Bundle : "openshift" ,
Application : "advisor" ,
EventType : "critical" ,
Timestamp : time . Now ( ) . UTC ( ) . Format ( time . RFC3339Nano ) ,
AccountID : "000001" ,
Events : events ,
Context : nil ,
}
msgBytes , err := json . Marshal ( msg )
helpers . FailOnError ( t , err )
_ , _ , err = kafkaProducer . ProduceMessage ( msgBytes )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
func TestProducerSendNotificationMessageMultipleEvents ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
kafkaProducer := Producer {
Configuration : brokerCfg ,
Producer : mockProducer ,
}
events := [ ] types . Event {
{
Metadata : types . EventMetadata { } ,
Payload : types . EventPayload { "rule_id" : "a unique ID" , "what happened" : "something baaad happened" , "error_code" : "3" } ,
} ,
{
Metadata : types . EventMetadata { } ,
Payload : types . EventPayload { "rule_id" : "a unique ID" , "what happened" : "something baaad happened" , "error_code" : "3" , "more_random_data" : "why not..." } ,
} ,
}
msg := types . NotificationMessage {
Bundle : "openshift" ,
Application : "advisor" ,
EventType : "critical" ,
Timestamp : time . Now ( ) . UTC ( ) . Format ( time . RFC3339Nano ) ,
AccountID : "000001" ,
Events : events ,
Context : nil ,
}
msgBytes , err := json . Marshal ( msg )
helpers . FailOnError ( t , err )
_ , _ , err = kafkaProducer . ProduceMessage ( msgBytes )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
func TestProducerSend ( t * testing . T ) {
producer , err := sarama . NewSyncProducer ( [ ] string { brokerCfg . Address } , nil )
kafkaProducer := KafkaProducer {
Configuration : brokerCfg ,
Producer : producer ,
}
events := [ ] types . Event {
{
Metadata : nil ,
Payload : types . EventPayload { "rule_id" : "a unique ID" , "what happened" : "something baaad happened" , "error_code" : "3" } ,
} ,
{
Metadata : nil ,
Payload : types . EventPayload { "rule_id" : "a unique ID" , "what happened" : "something baaad happened" , "error_code" : "3" } ,
} ,
}
msg := types . NotificationMessage {
Bundle : "openshift" ,
Application : "advisor" ,
EventType : "critical" ,
Timestamp : time . Now ( ) . UTC ( ) . Format ( time . RFC3339Nano ) ,
AccountID : "000001" ,
Events : events ,
Context : "no_context" ,
}
_ , _ , err = kafkaProducer . ProduceMessage ( msg )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
* /
|
This sends to real kafka broker, just to make sure =)
|
func TestProducerSend(t *testing.T) {
producer, err := sarama.NewSyncProducer([]string{brokerCfg.Address}, nil)
kafkaProducer := KafkaProducer{
Configuration: brokerCfg,
Producer: producer ,
}
events := []types.Event{
{
Metadata: nil,
Payload: types.EventPayload{"rule_id": "a unique ID", "what happened": "something baaad happened", "error_code":"3"},
},
{
Metadata: nil,
Payload: types.EventPayload{"rule_id": "a unique ID", "what happened": "something baaad happened", "error_code":"3"},
},
}
msg := types.NotificationMessage{
Bundle: "openshift",
Application: "advisor",
EventType: "critical",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
AccountID: "000001",
Events: events,
Context: "no_context",
}
_, _, err = kafkaProducer.ProduceMessage(msg)
assert.NoError(t, err, "Couldn't produce message with given broker configuration")
helpers.FailOnError(t, kafkaProducer.Close())
}
*/
|