|
|
Entry point to the notification writer service.
The service contains consumer (usually Kafka consumer) that consumes
messages from given source, processes those messages and stores them
in configured data store.
The main task for this service is to listen to configured Kafka topic,
consume all messages from such topic, and write OCP results (in JSON format)
with additional information (like organization ID, cluster name, Kafka
offset etc.) into a database table named newreports. Multiple reports can
be consumed and written into the database for the same cluster, because the
primary (compound) key for newreports table is set to the combination
(orgid, cluster, updatedat).
When some message does not conform to expected schema (for example if org_id
is missing for any reason), such message is dropped and the error message
with all relevant information about the issue is stored into the log.
Messages are expected to contain report body represented as JSON. This body
is shrunk before it's stored into database so the database remains
relatively small.
|
package main
|
Entry point to the CCX Notification writer service
|
|
Generated documentation is available at:
https://pkg.go.dev/github.com/RedHatInsights/ccx-notification-writer/
Documentation in literate-programming-style is available at:
https://redhatinsights.github.io/ccx-notification-writer/packages/ccxnotificationwriter.html
|
import (
"flag"
"fmt"
"math"
"net/http"
"os"
"strconv"
"strings"
"github.com/RedHatInsights/insights-operator-utils/logger"
utils "github.com/RedHatInsights/insights-operator-utils/migrations"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/Shopify/sarama"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
|
Messages to be displayed on terminal or written into logs
|
const (
versionMessage = "CCX Notification Writer version 1.0"
authorsMessage = "Pavel Tisnovsky, Red Hat Inc."
connectionToBrokerMessage = "Connection to broker"
allBrokerConnectionAttemptsMessage = "Couldn't connect to any of the provided brokers"
operationFailedMessage = "Operation failed"
notConnectedToBrokerMessage = "Not connected to broker"
brokerConnectionSuccessMessage = "Broker connection OK"
databaseCleanupOperationFailedMessage = "Database cleanup operation failed"
databaseDropTablesOperationFailedMessage = "Database drop tables operation failed"
databasePrintNewReportsForCleanupOperationFailedMessage = "Print records to cleanup from `new_reports` failed"
databasePrintOldReportsForCleanupOperationFailedMessage = "Print records to cleanup from `reported` table failed"
databasePrintReadErrorsForCleanupOperationFailedMessage = "Print records to cleanup from `read_errors` table failed"
databaseCleanupNewReportsOperationFailedMessage = "Cleanup records from `new_reports` table failed"
databaseCleanupOldReportsOperationFailedMessage = "Cleanup records from `reported` table failed"
databaseCleanupReadErrorsOperationFailedMessage = "Cleanup records from `read_errors` table failed"
rowsInsertedMessage = "Rows inserted"
rowsDeletedMessage = "Rows deleted"
rowsAffectedMessage = "Rows affected"
brokerAddresses = "Broker addresses"
StorageHandleErr = "unable to get storage handle"
)
|
Configuration-related constants
|
const (
|
environment variable that might contain name of configuration file
(it does not have to exist - in this case defaultConfigFileName
value is used instead)
|
configFileEnvVariableName = "CCX_NOTIFICATION_WRITER_CONFIG_FILE"
|
default configuration file name without implicit extension (.toml)
|
defaultConfigFileName = "config"
)
|
Exit codes
|
const (
|
ExitStatusOK means that the tool finished with success
|
ExitStatusOK = iota
|
ExitStatusConfigurationError is returned in case of any configuration-related error
|
ExitStatusConfigurationError
|
ExitStatusConsumerError is returned in case of any consumer-related error
|
ExitStatusConsumerError
|
ExitStatusProducerError is returned in case of any producer-related error
|
ExitStatusProducerError
|
ExitStatusKafkaError is returned in case of any Kafka-related error
|
ExitStatusKafkaError
|
ExitStatusStorageError is returned in case of any consumer-related error
|
ExitStatusStorageError
|
ExitStatusHTTPServerError is returned in case the HTTP server can not be started
|
ExitStatusHTTPServerError
|
ExitStatusMigrationError is returned in case of an error while attempting to perform DB migrations
|
ExitStatusMigrationError
)
|
showVersion function displays version information to standard output.
|
func showVersion ( ) {
fmt . Println ( versionMessage )
}
|
showAuthors function displays information about authors to standard output.
|
func showAuthors ( ) {
fmt . Println ( authorsMessage )
}
|
showConfiguration function displays actual configuration.
|
func showConfiguration ( configuration * ConfigStruct ) {
|
retrieve and then display broker configuration
|
brokerConfig := GetBrokerConfiguration ( configuration )
log . Info ( ) .
Str ( brokerAddresses , brokerConfig . Addresses ) .
Str ( "Security protocol" , brokerConfig . SecurityProtocol ) .
Str ( "Cert path" , brokerConfig . CertPath ) .
Str ( "Sasl mechanism" , brokerConfig . SaslMechanism ) .
Str ( "Sasl username" , brokerConfig . SaslUsername ) .
Str ( "Topic" , brokerConfig . Topic ) .
Str ( "Group" , brokerConfig . Group ) .
Bool ( "Enabled" , brokerConfig . Enabled ) .
Msg ( "Broker configuration" )
|
retrieve and then display storage configuration
|
storageConfig := GetStorageConfiguration ( configuration )
log . Info ( ) .
Str ( "Driver" , storageConfig . Driver ) .
Str ( "DB Name" , storageConfig . PGDBName ) .
Str ( "Username" , storageConfig . PGUsername ) .
Str ( "Host" , storageConfig . PGHost ) .
Int ( "Port" , storageConfig . PGPort ) .
Bool ( "LogSQLQueries" , storageConfig . LogSQLQueries ) .
Msg ( "Storage configuration" )
|
retrieve and then display logging configuration
|
loggingConfig := GetLoggingConfiguration ( configuration )
log . Info ( ) .
Str ( "Level" , loggingConfig . LogLevel ) .
Bool ( "Pretty colored debug logging" , loggingConfig . Debug ) .
Msg ( "Logging configuration" )
|
retrieve and then display metrics configuration
|
metricsConfig := GetMetricsConfiguration ( configuration )
log . Info ( ) .
Str ( "Namespace" , metricsConfig . Namespace ) .
Str ( "Address" , metricsConfig . Address ) .
Msg ( "Metrics configuration" )
}
|
tryToConnectToKafka function just tries to establish connection to Kafka
broker
|
func tryToConnectToKafka ( configuration * ConfigStruct ) ( int , error ) {
log . Info ( ) . Msg ( "Checking connection to Kafka" )
|
prepare broker configuration
|
brokerConfiguration := GetBrokerConfiguration ( configuration )
|
display basic info about broker that will be used
|
log . Info ( ) .
Str ( brokerAddresses , brokerConfiguration . Addresses )
var err error
for _ , addr := range strings . Split ( brokerConfiguration . Addresses , "," ) {
|
create new broker instance (w/o any checks)
|
broker := sarama . NewBroker ( addr )
|
check broker connection
|
err = broker . Open ( nil )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( connectionToBrokerMessage )
continue
}
|
check if connection remain
|
connected , err := broker . Connected ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( connectionToBrokerMessage )
continue
}
if ! connected {
log . Error ( ) . Err ( err ) . Msg ( notConnectedToBrokerMessage )
continue
}
|
connection was established
|
log . Info ( ) . Msg ( brokerConnectionSuccessMessage )
|
everything seems to be ok
|
return ExitStatusOK , nil
}
log . Error ( ) . Msg ( allBrokerConnectionAttemptsMessage )
return ExitStatusKafkaError , err
}
|
performDatabaseInitialization function performs database initialization -
creates all tables and indexes in database, also insert constant record into
database.
|
func performDatabaseInitialization ( configuration * ConfigStruct ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
try to perform database initialization
|
err = storage . DatabaseInitialization ( )
if err != nil {
log . Err ( err ) . Msg ( "Database initialization operation failed" )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performDatabaseInitMigration function initialize migration table.
|
func performDatabaseInitMigration ( configuration * ConfigStruct ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
try to initialize database migration
|
err = storage . DatabaseInitMigration ( )
if err != nil {
log . Err ( err ) . Msg ( "Database migration initialization operation failed" )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performDatabaseCleanup function performs database cleanup - deletes content
of all tables in database.
|
func performDatabaseCleanup ( configuration * ConfigStruct ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
try to cleanup the whole database
|
err = storage . DatabaseCleanup ( )
if err != nil {
log . Err ( err ) . Msg ( databaseCleanupOperationFailedMessage )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performDatabaseDropTables function performs drop of all databases tables.
|
func performDatabaseDropTables ( configuration * ConfigStruct ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
try to drop all tables from database
|
err = storage . DatabaseDropTables ( )
if err != nil {
log . Err ( err ) . Msg ( databaseDropTablesOperationFailedMessage )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
printNewReportsForCleanup function print all reports stored in new_reports
table that are older than specified maximum age.
See also: performNewReportsCleanup
|
func printNewReportsForCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
err = storage . PrintNewReportsForCleanup ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databasePrintNewReportsForCleanupOperationFailedMessage )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performNewReportsCleanup function deletes all reports from new_reports
table that are older than specified max age.
See also: printNewReportsForCleanup
|
func performNewReportsCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
affected , err := storage . CleanupNewReports ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databaseCleanupNewReportsOperationFailedMessage )
return ExitStatusStorageError , err
}
log . Info ( ) . Int ( rowsDeletedMessage , affected ) . Msg ( "Cleanup `new_reports` finished" )
return ExitStatusOK , nil
}
|
printOldReportsForCleanup function print all reports stored in reported
table that are older than specified max age.
See also: performOldReportsCleanup
|
func printOldReportsForCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
err = storage . PrintOldReportsForCleanup ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databasePrintOldReportsForCleanupOperationFailedMessage )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performOldReportsCleanup function deletes all reports from reported table
that are older than specified max age.
See also: printOldReportsForCleanup
|
func performOldReportsCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
affected , err := storage . CleanupOldReports ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databaseCleanupOldReportsOperationFailedMessage )
return ExitStatusStorageError , err
}
log . Info ( ) . Int ( rowsDeletedMessage , affected ) . Msg ( "Cleanup `reported` finished" )
return ExitStatusOK , nil
}
|
printReadErrorsForCleanup function print all reports stored in read_errors
table that are older than specified max age.
See also: performReadErrorsForCleanup
|
func printReadErrorsForCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
err = storage . PrintReadErrorsForCleanup ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databasePrintReadErrorsForCleanupOperationFailedMessage )
return ExitStatusStorageError , err
}
return ExitStatusOK , nil
}
|
performReadErrorsCleanup function deletes all reports from read_errors table
that are older than specified max age.
|
func performReadErrorsCleanup ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
affected , err := storage . CleanupReadErrors ( cliFlags . MaxAge )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( databaseCleanupReadErrorsOperationFailedMessage )
return ExitStatusStorageError , err
}
log . Info ( ) . Int ( rowsDeletedMessage , affected ) . Msg ( "Cleanup `read_errors` finished" )
return ExitStatusOK , nil
}
|
startService function tries to start the notification writer service,
connect to storage and initialize connection to message broker.
|
func startService ( configuration * ConfigStruct ) ( int , error ) {
|
show configuration at startup
|
showConfiguration ( configuration )
|
configure metrics
|
metricsConfig := GetMetricsConfiguration ( configuration )
if metricsConfig . Namespace != "" {
log . Info ( ) . Str ( "namespace" , metricsConfig . Namespace ) . Msg ( "Setting metrics namespace" )
AddMetricsWithNamespace ( metricsConfig . Namespace )
}
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
prepare HTTP server with metrics exposed
|
err = startHTTPServer ( metricsConfig . Address )
if err != nil {
log . Error ( ) . Err ( err ) . Send ( )
return ExitStatusHTTPServerError , err
}
|
prepare broker
|
brokerConfiguration := GetBrokerConfiguration ( configuration )
|
if broker is disabled, simply don't start it
|
if brokerConfiguration . Enabled {
err := startConsumer ( configuration , storage )
if err != nil {
log . Error ( ) . Err ( err ) . Send ( )
return ExitStatusConsumerError , err
}
} else {
log . Info ( ) . Msg ( "Broker is disabled, not starting it" )
}
return ExitStatusOK , nil
}
|
startConsumer function starts the Kafka consumer.
|
func startConsumer ( config * ConfigStruct , storage Storage ) error {
consumer , err := NewConsumer ( & config . Broker , storage )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Construct broker failed" )
return err
}
pt , err := NewPayloadTrackerProducer ( config )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Construct payload tracker producer failed" )
return err
}
consumer . Tracker = pt
consumer . Serve ( )
return nil
}
|
startHTTP server starts HTTP or HTTPS server with exposed metrics.
|
func startHTTPServer ( address string ) error {
|
setup handlers
|
http . Handle ( "/metrics" , promhttp . Handler ( ) )
|
start the server
|
go func ( ) {
log . Info ( ) . Str ( "HTTP server address" , address ) . Msg ( "Starting HTTP server" )
err := http . ListenAndServe ( address , nil )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Listen and serve" )
}
} ( )
return nil
}
|
doSelectedOperation function perform operation selected on command line.
When no operation is specified, the Notification writer service is started
instead.
gocyclo:ignore
|
func doSelectedOperation ( configuration * ConfigStruct , cliFlags CliFlags ) ( int , error ) {
switch {
case cliFlags . ShowVersion :
showVersion ( )
return ExitStatusOK , nil
case cliFlags . ShowAuthors :
showAuthors ( )
return ExitStatusOK , nil
case cliFlags . ShowConfiguration :
showConfiguration ( configuration )
return ExitStatusOK , nil
case cliFlags . CheckConnectionToKafka :
return tryToConnectToKafka ( configuration )
case cliFlags . PerformDatabaseInitialization :
return performDatabaseInitialization ( configuration )
case cliFlags . PerformDatabaseCleanup :
return performDatabaseCleanup ( configuration )
case cliFlags . PerformDatabaseDropTables :
return performDatabaseDropTables ( configuration )
case cliFlags . PerformDatabaseInitMigration :
return performDatabaseInitMigration ( configuration )
case cliFlags . PrintNewReportsForCleanup :
return printNewReportsForCleanup ( configuration , cliFlags )
case cliFlags . PerformNewReportsCleanup :
return performNewReportsCleanup ( configuration , cliFlags )
case cliFlags . PrintOldReportsForCleanup :
return printOldReportsForCleanup ( configuration , cliFlags )
case cliFlags . PerformOldReportsCleanup :
return performOldReportsCleanup ( configuration , cliFlags )
case cliFlags . PrintReadErrorsForCleanup :
return printReadErrorsForCleanup ( configuration , cliFlags )
case cliFlags . PerformReadErrorsCleanup :
return performReadErrorsCleanup ( configuration , cliFlags )
case cliFlags . MigrationInfo :
return PrintMigrationInfo ( configuration )
case cliFlags . PerformMigrations != "" :
return PerformMigrations ( configuration , cliFlags . PerformMigrations )
case cliFlags . TruncateOldReports :
return performTruncateOldReports ( configuration )
default :
exitCode , err := startService ( configuration )
return exitCode , err
}
|
this can not happen: return ExitStatusOK, nil
|
}
|
convertLogLevel function converts log level specified in configuration file
into proper zerolog constant.
TODO: refactor utils/logger appropriately
|
func convertLogLevel ( level string ) zerolog . Level {
level = strings . ToLower ( strings . TrimSpace ( level ) )
switch level {
case "debug" :
return zerolog . DebugLevel
case "info" :
return zerolog . InfoLevel
case "warn" , "warning" :
return zerolog . WarnLevel
case "error" :
return zerolog . ErrorLevel
case "fatal" :
return zerolog . FatalLevel
}
return zerolog . DebugLevel
}
|
main function is entry point to the Notification writer service.
|
func main ( ) {
var cliFlags CliFlags
|
define and then parse all command line options
|
flag . BoolVar ( & cliFlags . PerformDatabaseInitialization , "db-init" , false , "perform database initialization" )
flag . BoolVar ( & cliFlags . PerformDatabaseCleanup , "db-cleanup" , false , "perform database cleanup" )
flag . BoolVar ( & cliFlags . PerformDatabaseDropTables , "db-drop-tables" , false , "drop all tables from database" )
flag . BoolVar ( & cliFlags . PerformDatabaseInitMigration , "db-init-migration" , false , "initialize migration" )
flag . BoolVar ( & cliFlags . CheckConnectionToKafka , "check-kafka" , false , "check connection to Kafka" )
flag . BoolVar ( & cliFlags . ShowVersion , "version" , false , "show version" )
flag . BoolVar ( & cliFlags . ShowAuthors , "authors" , false , "show authors" )
flag . BoolVar ( & cliFlags . ShowConfiguration , "show-configuration" , false , "show configuration" )
flag . BoolVar ( & cliFlags . PrintNewReportsForCleanup , "print-new-reports-for-cleanup" , false , "print new reports to be cleaned up" )
flag . BoolVar ( & cliFlags . PerformNewReportsCleanup , "new-reports-cleanup" , false , "perform new reports clean up" )
flag . BoolVar ( & cliFlags . PrintOldReportsForCleanup , "print-old-reports-for-cleanup" , false , "print old reports to be cleaned up" )
flag . BoolVar ( & cliFlags . PerformOldReportsCleanup , "old-reports-cleanup" , false , "perform old reports clean up" )
flag . BoolVar ( & cliFlags . PrintReadErrorsForCleanup , "print-read-errors-for-cleanup" , false , "print records from read_errors table to be cleaned up" )
flag . BoolVar ( & cliFlags . PerformReadErrorsCleanup , "read-errors-cleanup" , false , "perform clean up of read_errors table" )
flag . BoolVar ( & cliFlags . MigrationInfo , "migration-info" , false , "prints migration info" )
flag . BoolVar ( & cliFlags . TruncateOldReports , "truncate-old-reports" , false , "truncate the reported table" )
flag . StringVar ( & cliFlags . MaxAge , "max-age" , "" , "max age for displaying/cleaning old records" )
flag . StringVar ( & cliFlags . PerformMigrations , "migrate" , "" , "set database version" )
flag . Parse ( )
|
service configuration has exactly the same structure as *.toml file
|
configuration , err := LoadConfiguration ( configFileEnvVariableName , defaultConfigFileName )
if err != nil {
log . Err ( err ) . Msg ( "Load configuration" )
os . Exit ( ExitStatusConfigurationError )
}
err = logger . InitZerolog (
GetLoggingConfiguration ( & configuration ) ,
GetCloudWatchConfiguration ( & configuration ) ,
GetSentryConfiguration ( & configuration ) ,
logger . KafkaZerologConfiguration { } ,
)
if err != nil {
log . Err ( err ) . Msg ( "Logging configuration error" )
os . Exit ( ExitStatusConfigurationError )
}
log . Debug ( ) . Msg ( "Started" )
|
override default value read from configuration file
|
if cliFlags . MaxAge == "" {
cliFlags . MaxAge = "7 days"
}
|
perform selected operation
|
exitStatus , err := doSelectedOperation ( & configuration , cliFlags )
if err != nil {
log . Err ( err ) . Msg ( "Do selected operation" )
os . Exit ( exitStatus )
return
}
log . Debug ( ) . Msg ( "Finished" )
}
|
PrintMigrationInfo function prints information about current DB migration
version without making any modifications in database.
|
func PrintMigrationInfo ( configuration * ConfigStruct ) ( int , error ) {
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( StorageHandleErr )
return ExitStatusMigrationError , err
}
currMigVer , err := utils . GetDBVersion ( storage . connection )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to get current DB version" )
return ExitStatusMigrationError , err
}
log . Info ( ) . Msgf ( "Current DB version: %d" , currMigVer )
log . Info ( ) . Msgf ( "Maximum available version: %d" , utils . GetMaxVersion ( ) )
return ExitStatusOK , nil
}
|
PerformMigrations migrates the database to the version
specified in params
|
func PerformMigrations ( configuration * ConfigStruct , migParam string ) ( exitStatus int , err error ) {
|
init migration utils
|
utils . Set ( All ( ) )
|
get db handle
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( StorageHandleErr )
exitStatus = ExitStatusMigrationError
return
}
|
parse migration params
|
var desiredVersion utils . Version
if migParam == "latest" {
desiredVersion = utils . GetMaxVersion ( )
} else {
vers , convErr := strconv . Atoi ( migParam )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "Unable to parse migration version: %v" , migParam )
exitStatus = ExitStatusMigrationError
err = convErr
return
}
if vers < 0 || vers > math . MaxUint32 {
log . Error ( ) . Err ( err ) . Msgf ( "Unable to parse migration version: %v version out of range" , migParam )
exitStatus = ExitStatusMigrationError
err = fmt . Errorf ( "version out of range: %v" , vers )
return
}
desiredVersion = utils . Version ( uint32 ( vers ) )
}
|
perform database migration
|
err = Migrate ( storage . Connection ( ) , desiredVersion )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "migration failure" )
exitStatus = ExitStatusMigrationError
return
}
exitStatus = ExitStatusOK
return
}
|
performTruncateOldReports function truncates the reported table.
|
func performTruncateOldReports ( configuration * ConfigStruct ) ( int , error ) {
|
prepare the storage
|
storageConfiguration := GetStorageConfiguration ( configuration )
storage , err := NewStorage ( & storageConfiguration )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( operationFailedMessage )
return ExitStatusStorageError , err
}
|
try to truncate the reported table
|
err = storage . TruncateOldReports ( )
if err != nil {
log . Err ( err ) . Msg ( databaseDropTablesOperationFailedMessage )
return ExitStatusStorageError , err
}
log . Info ( ) . Msg ( "Truncated `reported` table successfully" )
return ExitStatusOK , nil
}
|