Package producer contains functions that can be used to produce (that is
send) messages to properly configured Kafka broker.
|
package producer
import (
"encoding/json"
"fmt"
"time"
"github.com/rs/zerolog/log"
"github.com/RedHatInsights/insights-results-aggregator/broker"
"github.com/RedHatInsights/insights-results-aggregator/types"
)
|
NewPayloadTrackerProducer constructs producer for payload tracker topic.
It is implemented as variable in order to allow monkey patching in unit tests.
|
var NewPayloadTrackerProducer = func ( brokerCfg broker . Configuration ) ( * PayloadTrackerProducer , error ) {
if brokerCfg . PayloadTrackerTopic == "" {
return nil , nil
}
p , err := New ( brokerCfg )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to create a new payload tracker producer" )
return nil , err
}
return & PayloadTrackerProducer {
KafkaProducer : * p ,
Configuration : brokerCfg ,
} , nil
}
|
PayloadTrackerMessage represents content of messages
sent to the Payload Tracker topic in Kafka.
|
type PayloadTrackerMessage struct {
Service string `json:"service"`
RequestID string `json:"request_id"`
Status string `json:"status"`
Date string `json:"date"`
OrgID string `json:"org_id,omitempty"`
Account string `json:"account,omitempty"`
}
|
produceMessage produces message to selected topic. That function returns
partition ID and offset of new message or an error value in case of any
problem on broker side.
|
func ( producer * PayloadTrackerProducer ) produceMessage ( trackerMsg PayloadTrackerMessage ) ( int32 , int64 , error ) {
jsonBytes , err := json . Marshal ( trackerMsg )
if err != nil {
return 0 , 0 , err
}
return producer . KafkaProducer . produceMessage ( jsonBytes , producer . Configuration . PayloadTrackerTopic )
}
|
TrackPayload publishes the status of a payload with the given request ID to
the payload tracker Kafka topic. Please keep in mind that if the request ID
is empty, the payload will not be tracked and no error will be raised because
this can happen in some scenarios and it is not considered an error.
Instead, only a warning is logged and no error is returned.
|
func ( producer * PayloadTrackerProducer ) TrackPayload (
reqID types . RequestID ,
timestamp time . Time ,
orgID * types . OrgID ,
account * types . Account ,
status string ,
) error {
if len ( reqID ) == 0 {
log . Warn ( ) . Str ( "Operation" , "TrackPayload" ) . Msg ( "request ID is missing, null or empty" )
return nil
}
statusUpdate := PayloadTrackerMessage {
Service : producer . Configuration . ServiceName ,
RequestID : string ( reqID ) ,
Status : status ,
Date : timestamp . UTC ( ) . Format ( time . RFC3339Nano ) ,
}
if orgID != nil {
statusUpdate . OrgID = fmt . Sprintf ( "%d" , * orgID )
}
if account != nil {
statusUpdate . Account = fmt . Sprintf ( "%d" , * account )
}
_ , _ , err := producer . produceMessage ( statusUpdate )
if err != nil {
log . Error ( ) . Err ( err ) .
Str ( "request ID" , string ( reqID ) ) .
Time ( "timestamp" , timestamp ) .
Str ( "status" , status ) .
Msg ( "unable to produce payload tracker message" )
return err
}
return nil
}
|