TestProducerClose makes sure it's possible to close the connection
|
func TestProducerClose ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
prod := main . Producer {
Configuration : & brokerCfg ,
Producer : mockProducer ,
}
err := prod . Close ( )
assert . NoError ( t , err , "failed to close Kafka producer" )
}
func TestProducerNew ( t * testing . T ) {
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
handlerMap := map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
}
mockBroker . SetHandlerByMap ( handlerMap )
prod , err := main . NewProducer ( & main . BrokerConfiguration {
Addresses : mockBroker . Addr ( ) ,
Topic : brokerCfg . Topic ,
} )
helpers . FailOnError ( t , err )
helpers . FailOnError ( t , prod . Close ( ) )
}
func TestProducerSendEmptyMessage ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
kafkaProducer := main . Producer {
Configuration : & brokerCfg ,
Producer : mockProducer ,
}
msgBytes , err := json . Marshal ( "" )
helpers . FailOnError ( t , err )
_ , _ , err = kafkaProducer . ProduceMessage ( msgBytes )
assert . NoError ( t , err , "Couldn't produce message with given broker configuration" )
helpers . FailOnError ( t , kafkaProducer . Close ( ) )
}
func TestPayloadTrackerProducerNew ( t * testing . T ) {
mockBroker := sarama . NewMockBroker ( t , 0 )
defer mockBroker . Close ( )
handlerMap := map [ string ] sarama . MockResponse {
"MetadataRequest" : sarama . NewMockMetadataResponse ( t ) .
SetBroker ( mockBroker . Addr ( ) , mockBroker . BrokerID ( ) ) .
SetLeader ( brokerCfg . Topic , 0 , mockBroker . BrokerID ( ) ) ,
"OffsetRequest" : sarama . NewMockOffsetResponse ( t ) .
SetOffset ( brokerCfg . Topic , 0 , - 1 , 0 ) .
SetOffset ( brokerCfg . Topic , 0 , - 2 , 0 ) ,
"FetchRequest" : sarama . NewMockFetchResponse ( t , 1 ) ,
"FindCoordinatorRequest" : sarama . NewMockFindCoordinatorResponse ( t ) .
SetCoordinator ( sarama . CoordinatorGroup , "" , mockBroker ) ,
"OffsetFetchRequest" : sarama . NewMockOffsetFetchResponse ( t ) .
SetOffset ( "" , brokerCfg . Topic , 0 , 0 , "" , sarama . ErrNoError ) ,
}
mockBroker . SetHandlerByMap ( handlerMap )
prod , err := main . NewPayloadTrackerProducer ( & main . ConfigStruct {
Broker : main . BrokerConfiguration {
Addresses : mockBroker . Addr ( ) ,
Topic : brokerCfg . Topic ,
} ,
Tracker : main . TrackerConfiguration {
ServiceName : "test_service" ,
Topic : "test_tracker_topic" ,
} ,
} )
helpers . FailOnError ( t , err )
helpers . FailOnError ( t , prod . Close ( ) )
}
func TestPayloadTrackerEmptyRequestID ( t * testing . T ) {
tracker := main . PayloadTrackerProducer {
ServiceName : "test" ,
Producer : main . Producer { } ,
}
err := tracker . TrackPayload ( "" , time . Now ( ) , "any_status" )
assert . NoError ( t , err , "No error should be returned for empty request ID" )
}
func TestPayloadTrackerValidRequestID ( t * testing . T ) {
mockProducer := mocks . NewSyncProducer ( t , nil )
mockProducer . ExpectSendMessageAndSucceed ( )
p := main . Producer {
Configuration : & brokerCfg ,
Producer : mockProducer ,
}
tracker := main . PayloadTrackerProducer {
ServiceName : "test" ,
Producer : p ,
}
err := tracker . TrackPayload ( "anything" , time . Now ( ) , "any_status" )
assert . NoError ( t , err , "No error should be returned for empty request ID" )
helpers . FailOnError ( t , tracker . Close ( ) )
}
|