package main
Documentation in literate-programming-style is available at:
This source file contains an implementation of interface between Go code and
(almost any) SQL database like PostgreSQL, SQLite, or MariaDB.
It is possible to configure connection to selected database by using
StorageConfiguration structure. Currently that structure contains two
configurable parameter:
Driver - a SQL driver, like "sqlite3", "pq" etc.
DataSource - specification of data source. The content of this parameter depends on the database used.
Generated documentation is available at:
import (
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
Error messages
const (
canNotConnectToDataStorageMessage = "Can not connect to data storage"
unableToCloseDBRowsHandle = "Unable to close the DB rows handle"
connectionNotEstablished = "Connection to database was not established"
Other messages
const (
tableName = "table"
clusterNameMsg = "cluster"
fileOpenMsg = "File open"
fileCloseMsg = "File close"
flushWriterMsg = "Flush writer"
writeToFileMsg = "Write to file"
SQL commands
const (
selectOldReports = `
SELECT cluster, reported_at, last_checked_at
FROM report
WHERE reported_at < NOW() - $1::INTERVAL
ORDER BY reported_at`
selectOldAdvisorRatings = `
SELECT org_id, rule_fqdn, error_key, rule_id, rating, last_updated_at
FROM advisor_ratings
WHERE last_updated_at < NOW() - $1::INTERVAL
ORDER BY last_updated_at`
selectOldConsumerErrors = `
SELECT topic, partition, topic_offset, key, consumed_at, message
FROM consumer_error
WHERE consumed_at < NOW() - $1::INTERVAL
ORDER BY consumed_at`
initDatabaseConnection initializes driver, checks if it's supported and
initializes connection to the storage.
func initDatabaseConnection ( configuration * StorageConfiguration ) ( * sql . DB , error ) {
check if storage configuration structure has been initialized and
passed to this function properly
if configuration == nil {
const message = "StorageConfiguration structure should be provided"
err := errors . New ( message )
log . Error ( ) . Msg ( message )
return nil , err
driverName := configuration . Driver
dataSource := ""
log . Info ( ) . Str ( "driverName" , configuration . Driver ) . Msg ( "DB connection configuration" )
initialize connection into selected database using the right driver
switch driverName {
case "sqlite3" :
dataSource = configuration . SQLiteDataSource
case "postgres" :
dataSource = fmt . Sprintf (
"postgresql://%v:%v@%v:%v/%v?%v" ,
configuration . PGUsername ,
configuration . PGPassword ,
configuration . PGHost ,
configuration . PGPort ,
configuration . PGDBName ,
configuration . PGParams ,
default :
err := fmt . Errorf ( "driver %v is not supported" , driverName )
log . Err ( err ) . Msg ( canNotConnectToDataStorageMessage )
return nil , err
try to initialize connection to the storage
connection , err := sql . Open ( driverName , dataSource )
check if establishing a connection was successful
if err != nil {
log . Err ( err ) . Msg ( canNotConnectToDataStorageMessage )
return nil , err
return connection , nil
displayMultipleRuleDisable function read and displays clusters where
multiple users have disabled some rules.
func displayMultipleRuleDisable ( connection * sql . DB , output string ) error {
var fout * os . File = nil
var writer * bufio . Writer = nil
if output != "" {
create output file
disable G304 (CWE-22): Potential file inclusion via variable (Confidence: HIGH, Severity: MEDIUM)
fout , err := os . Create ( output )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileOpenMsg )
an object used to write to file
writer = bufio . NewWriter ( fout )
defer func ( ) {
output needs to be flushed at the end
if writer != nil {
err := writer . Flush ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( flushWriterMsg )
} ( )
defer func ( ) {
file needs to be closed at the end
if fout != nil {
err := fout . Close ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileCloseMsg )
} ( )
first query to be performed
query1 := `
select cluster_id, rule_id, count(*) as cnt
from cluster_rule_toggle
group by cluster_id, rule_id
having count(*)>1
order by cnt desc;
second query to be performed
query2 := `
select cluster_id, rule_id, count(*) as cnt
from cluster_user_rule_disable_feedback
group by cluster_id, rule_id
having count(*)>1
order by cnt desc;
perform the first query and display results
err := performDisplayMultipleRuleDisable ( connection , writer , query1 ,
"cluster_rule_toggle" )
the first query+display function might throw some error
if err != nil {
return err
perform second query and display results
err = performDisplayMultipleRuleDisable ( connection , writer , query2 ,
"cluster_user_rule_disable_feedback" )
second query+display function might throw some error
return err
performDisplayMultipleRuleDisable function displays cluster names and org
ids where multiple users disabled any rule
func performDisplayMultipleRuleDisable ( connection * sql . DB ,
writer * bufio . Writer , query string , tableName string ) error {
perform given query to database
rows , err := connection . Query ( query )
if err != nil {
return err
iterate over all records that has been found
for rows . Next ( ) {
var (
clusterName string
ruleID string
count int
read one report
if err := rows . Scan ( & clusterName , & ruleID , & count ) ; err != nil {
close the result set in case of any error
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
return err
try to read organization ID for given cluster name
orgID , err := readOrgID ( connection , clusterName )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "readOrgID" )
return err
just print the report, including organization ID
log . Info ( ) . Str ( "table" , tableName ) .
Int ( "org ID" , orgID ) .
Str ( clusterNameMsg , clusterName ) .
Str ( "rule ID" , ruleID ) .
Int ( "count" , count ) .
Msg ( "Multiple rule disable" )
export to file (if enabled)
if writer != nil {
_ , err := fmt . Fprintf ( writer , "%d,%s,%s,%d\n" , orgID , clusterName , ruleID , count )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( writeToFileMsg )
return nil
readOrgID function tries to read organization ID for given cluster name
func readOrgID ( connection * sql . DB , clusterName string ) ( int , error ) {
query := "select org_id from report where cluster = $1"
perform the query
rows , err := connection . Query ( query , clusterName )
if err != nil {
log . Debug ( ) . Msg ( "query" )
return - 1 , err
and check the result (if any)
if rows . Next ( ) {
var orgID int
read one organization ID returned in query result
if err := rows . Scan ( & orgID ) ; err != nil {
proper error logging will be performed elsewhere
log . Debug ( ) . Str ( clusterNameMsg , clusterName ) . Msg ( "scan" )
close the result set in case of any error
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
return - 1 , err
return orgID , nil
no result?
log . Debug ( ) . Str ( clusterNameMsg , clusterName ) . Msg ( "no org_id for cluster" )
return - 1 , nil
func createOutputFile ( output string ) ( * os . File , * bufio . Writer ) {
var fout * os . File = nil
var writer * bufio . Writer = nil
if output != "" {
create output file
disable G304 (CWE-22): Potential file inclusion via variable (Confidence: HIGH, Severity: MEDIUM)
fout , err := os . Create ( output )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileOpenMsg )
an object used to write to file
writer = bufio . NewWriter ( fout )
return fout , writer
displayAllOldRecords function read all old records, ie. records that are
older than the specified time duration. Those records are simply displayed.
func displayAllOldRecords ( connection * sql . DB , maxAge , output string ) error {
check if connection has been initialized
if connection == nil {
log . Error ( ) . Msg ( connectionNotEstablished )
return errors . New ( connectionNotEstablished )
fout , writer := createOutputFile ( output )
defer func ( ) {
output needs to be flushed at the end
if writer != nil {
err := writer . Flush ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( flushWriterMsg )
} ( )
defer func ( ) {
file needs to be closed at the end
if fout != nil {
err := fout . Close ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileCloseMsg )
} ( )
main function of this tool is ability to delete old reports
err := performListOfOldReports ( connection , maxAge , writer )
skip next operation on first error
if err != nil {
return err
but we might be interested in other tables as well, especially advisor ratings
err = performListOfOldRatings ( connection , maxAge )
skip next operation on first error
if err != nil {
return err
also but we might be interested in other consumer errors
err = performListOfOldConsumerErrors ( connection , maxAge )
skip next operation on first error
if err != nil {
return err
return nil
func listOldDatabaseRecords ( connection * sql . DB , maxAge string ,
writer * bufio . Writer , query string ,
logEntry string , countLogEntry string ,
callback func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) ) error {
log . Info ( ) . Msg ( logEntry + " begin" )
rows , err := connection . Query ( query , maxAge )
if err != nil {
return err
count , err := callback ( rows , writer )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Query error" )
return err
log . Info ( ) . Int ( countLogEntry , count ) . Msg ( logEntry + " end" )
return nil
performListOfOldReports read and displays old records read from reported_at
func performListOfOldReports ( connection * sql . DB , maxAge string , writer * bufio . Writer ) error {
return listOldDatabaseRecords ( connection , maxAge , writer , selectOldReports , "List of old reports" , "reports count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
used to compute a real record age
now := time . Now ( )
reports count
count := 0
iterate over all old records
for rows . Next ( ) {
var (
clusterName string
reported time . Time
lastChecked time . Time
read one old record from the report table
if err := rows . Scan ( & clusterName , & reported , & lastChecked ) ; err != nil {
close the result set in case of any error
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
return count , err
compute the real record age
age := int ( math . Ceil ( now . Sub ( reported ) . Hours ( ) / 24 ) )
prepare for the report
reportedF := reported . Format ( time . RFC3339 )
lastCheckedF := lastChecked . Format ( time . RFC3339 )
just print the report
log . Info ( ) . Str ( clusterNameMsg , clusterName ) .
Str ( "reported" , reportedF ) .
Str ( "lastChecked" , lastCheckedF ) .
Int ( "age" , age ) .
Msg ( "Old report" )
if writer != nil {
_ , err := fmt . Fprintf ( writer , "%s,%s,%s,%d\n" , clusterName , reportedF , lastCheckedF , age )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( writeToFileMsg )
count ++
return count , nil
} )
performListOfOldRatings read and displays old Advisor ratings read from
advisor_ratings table
func performListOfOldRatings ( connection * sql . DB , maxAge string ) error {
return listOldDatabaseRecords ( connection , maxAge , nil , selectOldAdvisorRatings , "List of old Advisor ratings" , "ratings count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
used to compute a real record age
now := time . Now ( )
reports count
count := 0
iterate over all old records
for rows . Next ( ) {
var (
orgID string
ruleFQDN string
errorKey string
ruleID string
rating int
lastUpdatedAt time . Time
read one old record from the report table
if err := rows . Scan ( & orgID , & ruleFQDN , & errorKey , & ruleID , & rating , & lastUpdatedAt ) ; err != nil {
close the result set in case of any error
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
return count , err
compute the real error age
age := int ( math . Ceil ( now . Sub ( lastUpdatedAt ) . Hours ( ) / 24 ) )
prepare for the report
lastUpdatedAtF := lastUpdatedAt . Format ( time . RFC3339 )
just print the report
log . Info ( ) .
Str ( "organization" , orgID ) .
Str ( "rule FQDN" , ruleFQDN ) .
Str ( "error key" , errorKey ) .
Int ( "rating" , rating ) .
Str ( "updated at" , lastUpdatedAtF ) .
Int ( "rating age" , age ) .
Msg ( "Old Advisor rating" )
count ++
return count , nil
} )
performListOfOldConsumerErrors read and displays consumer errors stored in
consumer_errors table
func performListOfOldConsumerErrors ( connection * sql . DB , maxAge string ) error {
return listOldDatabaseRecords ( connection , maxAge , nil , selectOldConsumerErrors , "List of old consumer errors" , "errors count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
used to compute a real record age
now := time . Now ( )
reports count
count := 0
iterate over all old records
for rows . Next ( ) {
var (
topic string
partition int
offset int
key string
consumedAt time . Time
message string
read one old record from the report table
if err := rows . Scan ( & topic , & partition , & offset , & key , & consumedAt , & message ) ; err != nil {
close the result set in case of any error
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
return count , err
compute the real error age
age := int ( math . Ceil ( now . Sub ( consumedAt ) . Hours ( ) / 24 ) )
prepare for the report
consumedF := consumedAt . Format ( time . RFC3339 )
just print the report
log . Info ( ) .
Str ( "topic" , topic ) .
Int ( "partition" , partition ) .
Int ( "offset" , offset ) .
Str ( "key" , key ) .
Str ( "message" , message ) .
Str ( "consumed" , consumedF ) .
Int ( "error age" , age ) .
Msg ( "Old consumer error" )
count ++
return count , nil
} )
deleteRecordFromTable function deletes selected records (identified by
cluster name) from database
func deleteRecordFromTable ( connection * sql . DB , table , key string , clusterName ClusterName ) ( int , error ) {
it is not possible to use parameter for table name or a key
disable "G202 (CWE-89): SQL string concatenation (Confidence: HIGH, Severity: MEDIUM)"
nosec G202
sqlStatement := "DELETE FROM " + table + " WHERE " + key + " = $1;"
perform the SQL statement
result , err := connection . Exec ( sqlStatement , clusterName )
if err != nil {
return 0 , err
read number of affected (deleted) rows
affected , err := result . RowsAffected ( )
if err != nil {
return 0 , err
return int ( affected ) , nil
tablesAndKeys contains list of all tables together with keys used to select
records to be deleted
var tablesAndKeys = [ ... ] TableAndKey {
TableName : "cluster_rule_toggle" ,
KeyName : "cluster_id" ,
} ,
TableName : "cluster_rule_user_feedback" ,
KeyName : "cluster_id" ,
} ,
TableName : "cluster_user_rule_disable_feedback" ,
KeyName : "cluster_id" ,
} ,
TableName : "rule_hit" ,
KeyName : "cluster_id" ,
} ,
TableName : "recommendation" ,
KeyName : "cluster_id" ,
} ,
TableName : "report_info" ,
KeyName : "cluster_id" ,
} ,
must be at the end due to constraints
TableName : "report" ,
KeyName : "cluster" ,
} ,
performVacuumDB vacuums the whole database
func performVacuumDB ( connection * sql . DB ) error {
log . Info ( ) . Msg ( "Vacuuming started" )
sqlStatement := "VACUUM VERBOSE;"
perform the SQL statement
_ , err := connection . Exec ( sqlStatement )
if err != nil {
return err
log . Info ( ) . Msg ( "Vacuuming finished" )
return nil
performCleanupInDB function cleans up all data for selected cluster names
func performCleanupInDB ( connection * sql . DB ,
clusterList ClusterList ) ( map [ string ] int , error ) {
return value
deletionsForTable := make ( map [ string ] int )
check if connection has been initialized
if connection == nil {
log . Error ( ) . Msg ( connectionNotEstablished )
return deletionsForTable , errors . New ( connectionNotEstablished )
initialize counters
for _ , tableAndKey := range tablesAndKeys {
deletionsForTable [ tableAndKey . TableName ] = 0
perform cleanup for selected cluster names
log . Info ( ) . Msg ( "Cleanup started" )
for _ , clusterName := range clusterList {
for _ , tableAndKey := range tablesAndKeys {
try to delete record from selected table
affected , err := deleteRecordFromTable ( connection ,
tableAndKey . TableName ,
tableAndKey . KeyName ,
clusterName )
if err != nil {
log . Error ( ) .
Err ( err ) .
Str ( tableName , tableAndKey . TableName ) .
Msg ( "Unable to delete record" )
} else {
log . Info ( ) .
Int ( "Affected" , affected ) .
Str ( tableName , tableAndKey . TableName ) .
Str ( clusterNameMsg , string ( clusterName ) ) .
Msg ( "Delete record" )
deletionsForTable [ tableAndKey . TableName ] += affected
log . Info ( ) . Msg ( "Cleanup finished" )
return deletionsForTable , nil
fillInDatabaseByTestData function fill-in database by test data (not to be
used against production database)
func fillInDatabaseByTestData ( connection * sql . DB ) error {
log . Info ( ) . Msg ( "Fill-in database started" )
var lastError error = nil
clusterNames := [ ... ] string {
"00000000-0000-0000-0000-000000000000" ,
"11111111-1111-1111-1111-111111111111" ,
"5d5892d4-1f74-4ccf-91af-548dfc9767aa" }
sqlStatements := [ ... ] string {
"INSERT INTO report (org_id, cluster, report, reported_at, last_checked_at, kafka_offset) values(1, $1, '', '2021-01-01', '2021-01-01', 10)" ,
"INSERT INTO cluster_rule_toggle (cluster_id, rule_id, user_id, disabled, disabled_at, enabled_at, updated_at) values($1, 1, 1, 0, '2021-01-01', '2021-01-01', '2021-01-01')" ,
"INSERT INTO cluster_rule_user_feedback (cluster_id, rule_id, user_id, message, user_vote, added_at, updated_at) values($1, 1, 1, 'foobar', 1, '2021-01-01', '2021-01-01')" ,
"INSERT INTO cluster_user_rule_disable_feedback (cluster_id, user_id, rule_id, message, added_at, updated_at) values($1, 1, 1, 'foobar', '2021-01-01', '2021-01-01')" ,
"INSERT INTO rule_hit (org_id, cluster_id, rule_fqdn, error_key, template_data) values(1, $1, 'foo', 'bar', '')" ,
for _ , clusterName := range clusterNames {
log . Info ( ) .
Str ( "cluster name" , clusterName ) .
Msg ( "data for new cluster" )
for _ , sqlStatement := range sqlStatements {
log . Info ( ) .
Str ( "SQL statement" , sqlStatement ) .
Msg ( "inserting" )
perform the SQL statement
_ , err := connection . Exec ( sqlStatement , clusterName )
if err != nil {
failure is usually ok - it might mean that
the record with given cluster name already
log . Err ( err ) . Msg ( "Insert error" )
lastError = err
log . Info ( ) . Msg ( "Fill-in database finished" )
return lastError