|
|
Package logger contains the configuration structures needed to configure
the access to CloudWatch server to sending the log messages there.
|
package logger
|
Documentation in literate-programming-style is available at:
https://redhatinsights.github.io/insights-operator-utils/packages/logger/logger.html
|
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
"github.com/IBM/sarama"
zlogsentry "github.com/archdx/zerolog-sentry"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
sentry "github.com/getsentry/sentry-go"
cww "github.com/lzap/cloudwatchwriter2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
var needClose [ ] io . Closer = [ ] io . Closer { }
|
AWSCloudWatchEndpoint allows you to mock cloudwatch client by redirecting requests to a local proxy
|
var AWSCloudWatchEndpoint string
const cloudwatchBatchInterval = 500 * time . Millisecond
|
WorkaroundForRHIOPS729 keeps only those fields that are currently getting their way to Kibana
TODO: delete when RHIOPS-729 is fixed
|
type WorkaroundForRHIOPS729 struct {
io . Writer
}
func ( writer WorkaroundForRHIOPS729 ) Write ( bytes [ ] byte ) ( int , error ) {
var obj map [ string ] interface { }
err := json . Unmarshal ( bytes , & obj )
if err != nil {
|
it's not JSON object, so we don't do anything
|
return writer . Writer . Write ( bytes )
}
|
lowercase the keys
|
for key := range obj {
val := obj [ key ]
delete ( obj , key )
obj [ strings . ToUpper ( key ) ] = val
}
resultBytes , err := json . Marshal ( obj )
if err != nil {
return 0 , err
}
written , err := writer . Writer . Write ( resultBytes )
if err != nil {
return written , err
}
if written < len ( resultBytes ) {
return written , fmt . Errorf ( "too few bytes were written" )
}
return len ( bytes ) , nil
}
|
InitZerolog initializes zerolog with provided configs to use proper stdout and/or CloudWatch logging
|
func InitZerolog (
loggingConf LoggingConfiguration , cloudWatchConf CloudWatchConfiguration , sentryConf SentryLoggingConfiguration ,
additionalWriters ... io . Writer ,
) error {
setGlobalLogLevel ( loggingConf )
var writers [ ] io . Writer
selectedOutput := os . Stdout
writers = append ( writers , additionalWriters ... )
if loggingConf . UseStderr {
selectedOutput = os . Stderr
}
var consoleWriter io . Writer
consoleWriter = selectedOutput
if loggingConf . Debug {
|
nice colored output
|
consoleWriter = zerolog . ConsoleWriter { Out : selectedOutput }
}
writers = append ( writers , consoleWriter )
if loggingConf . LoggingToCloudWatchEnabled {
cloudWatchWriter , err := setupCloudwatchLogging ( & cloudWatchConf )
if err != nil {
err = fmt . Errorf ( "Error initializing Cloudwatch logging: %s" , err . Error ( ) )
return err
}
writers = append ( writers , & WorkaroundForRHIOPS729 { Writer : cloudWatchWriter } )
}
if loggingConf . LoggingToSentryEnabled {
sentryWriter , err := setupSentryLogging ( sentryConf )
if err != nil {
err = fmt . Errorf ( "Error initializing Sentry logging: %s" , err . Error ( ) )
return err
}
writers = append ( writers , sentryWriter )
needClose = append ( needClose , sentryWriter )
}
logsWriter := zerolog . MultiLevelWriter ( writers ... )
log . Logger = zerolog . New ( logsWriter ) . With ( ) . Timestamp ( ) . Logger ( )
|
zerolog doesn't implement Println required by sarama
|
sarama . Logger = & SaramaZerologger { zerologger : log . Logger }
return nil
}
|
CloseZerolog closes properly the zerolog, if needed
|
func CloseZerolog ( ) {
for _ , toClose := range needClose {
if err := toClose . Close ( ) ; err != nil {
log . Debug ( ) . Err ( err ) . Msg ( "Error when closing" )
}
}
}
func setGlobalLogLevel ( configuration LoggingConfiguration ) {
logLevel := convertLogLevel ( configuration . LogLevel )
zerolog . SetGlobalLevel ( logLevel )
}
func convertLogLevel ( level string ) zerolog . Level {
level = strings . ToLower ( strings . TrimSpace ( level ) )
switch level {
case "debug" :
return zerolog . DebugLevel
case "info" :
return zerolog . InfoLevel
case "warn" , "warning" :
return zerolog . WarnLevel
case "error" :
return zerolog . ErrorLevel
case "fatal" :
return zerolog . FatalLevel
}
return zerolog . DebugLevel
}
func fillInMissingConfiguration ( conf * CloudWatchConfiguration ) error {
|
os.Hostname is preferred to os.Getenv("HOSTNAME") because the env var might
not be populated yet (e.g. GitHub runners)
|
hostname , err := os . Hostname ( )
if err != nil {
return err
}
|
if no log stream name is explicitly provided, HOSTNAME is used
|
if conf . StreamName == "" {
conf . StreamName = hostname
} else {
|
take provided log stream name and replace any $HOSTNAME placeholders with real hostname
|
conf . StreamName = strings . ReplaceAll ( conf . StreamName , "$HOSTNAME" , hostname )
}
return nil
}
func setupCloudwatchLogging ( conf * CloudWatchConfiguration ) ( io . Writer , error ) {
if err := fillInMissingConfiguration ( conf ) ; err != nil {
return nil , err
}
if conf . LogGroup == "" {
return nil , fmt . Errorf ( "log group name cannot be empty" )
}
|
Build configuration options for AWS SDK v2
|
var configOptions [ ] func ( * config . LoadOptions ) error
|
Set region
|
if conf . AWSRegion != "" {
configOptions = append ( configOptions , config . WithRegion ( conf . AWSRegion ) )
}
|
Set credentials if provided
|
if conf . AWSAccessID != "" && conf . AWSSecretKey != "" {
configOptions = append ( configOptions , config . WithCredentialsProvider (
credentials . NewStaticCredentialsProvider ( conf . AWSAccessID , conf . AWSSecretKey , conf . AWSSessionToken ) ,
) )
}
|
Load the configuration
|
cfg , err := config . LoadDefaultConfig ( context . TODO ( ) , configOptions ... )
if err != nil {
return nil , fmt . Errorf ( "failed to load AWS config: %w" , err )
}
|
Create CloudWatch Logs client with service-specific endpoint if provided
|
var cloudWatchClient * cloudwatchlogs . Client
if len ( AWSCloudWatchEndpoint ) > 0 {
cloudWatchClient = cloudwatchlogs . NewFromConfig ( cfg , func ( o * cloudwatchlogs . Options ) {
o . BaseEndpoint = aws . String ( AWSCloudWatchEndpoint )
|
Use the default HTTP client to ensure gock can intercept requests
|
o . HTTPClient = http . DefaultClient
} )
} else {
cloudWatchClient = cloudwatchlogs . NewFromConfig ( cfg )
}
return cww . NewWithClient (
cloudWatchClient , cloudwatchBatchInterval , conf . LogGroup , conf . StreamName )
}
func sentryBeforeSend ( event * sentry . Event , _ * sentry . EventHint ) * sentry . Event {
event . Fingerprint = [ ] string { event . Message }
return event
}
func setupSentryLogging ( conf SentryLoggingConfiguration ) ( io . WriteCloser , error ) {
sentryWriter , err := zlogsentry . New (
conf . SentryDSN ,
zlogsentry . WithEnvironment ( conf . SentryEnvironment ) ,
zlogsentry . WithBeforeSend ( sentryBeforeSend ) ,
)
if err != nil {
return nil , err
}
return sentryWriter , nil
}
const kafkaErrorPrefix = "kafka: error"
|
SaramaZerologger is a wrapper to make sarama log to zerolog
those logs can be filtered by key "package" with value "sarama"
|
type SaramaZerologger struct { zerologger zerolog . Logger }
|
Print wraps print method
|
func ( logger * SaramaZerologger ) Print ( params ... interface { } ) {
var messages [ ] string
for _ , item := range params {
messages = append ( messages , fmt . Sprint ( item ) )
}
logger . logMessage ( "%v" , strings . Join ( messages , " " ) )
}
|
Printf wraps printf method
|
func ( logger * SaramaZerologger ) Printf ( format string , params ... interface { } ) {
logger . logMessage ( format , params ... )
}
|
Println wraps println method
|
func ( logger * SaramaZerologger ) Println ( v ... interface { } ) {
logger . Print ( v ... )
}
func ( logger * SaramaZerologger ) logMessage ( format string , params ... interface { } ) {
var event * zerolog . Event
messageStr := fmt . Sprintf ( format , params ... )
if strings . HasPrefix ( messageStr , kafkaErrorPrefix ) {
event = logger . zerologger . Error ( )
} else {
event = logger . zerologger . Info ( )
}
event = event . Str ( "package" , "sarama" )
event . Msg ( messageStr )
}
|