|
package main
|
Documentation in literate-programming-style is available at:
https://redhatinsights.github.io/insights-results-aggregator-cleaner/packages/database.html
|
|
This source file contains an implementation of interface between Go code and
(almost any) SQL database like PostgreSQL, SQLite, or MariaDB.
It is possible to configure connection to selected database by using
StorageConfiguration structure. Currently that structure contains two
configurable parameter:
Driver - a SQL driver, like "sqlite3", "pq" etc.
DataSource - specification of data source. The content of this parameter depends on the database used.
|
|
Generated documentation is available at:
https://pkg.go.dev/github.com/RedHatInsights/insights-results-aggregator-cleaner
|
import (
"bufio"
"errors"
"fmt"
"math"
"os"
"time"
"database/sql"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/rs/zerolog/log"
)
|
Error messages
|
const (
canNotConnectToDataStorageMessage = "Can not connect to data storage"
unableToCloseDBRowsHandle = "Unable to close the DB rows handle"
connectionNotEstablished = "Connection to database was not established"
)
|
Other messages
|
const (
tableName = "table"
clusterNameMsg = "cluster"
fileOpenMsg = "File open"
fileCloseMsg = "File close"
flushWriterMsg = "Flush writer"
writeToFileMsg = "Write to file"
)
|
SQL commands
|
const (
selectOldReports = `
SELECT cluster, reported_at, last_checked_at
FROM report
WHERE reported_at < NOW() - $1::INTERVAL
ORDER BY reported_at`
selectOldAdvisorRatings = `
SELECT org_id, rule_fqdn, error_key, rule_id, rating, last_updated_at
FROM advisor_ratings
WHERE last_updated_at < NOW() - $1::INTERVAL
ORDER BY last_updated_at`
selectOldConsumerErrors = `
SELECT topic, partition, topic_offset, key, consumed_at, message
FROM consumer_error
WHERE consumed_at < NOW() - $1::INTERVAL
ORDER BY consumed_at`
)
|
initDatabaseConnection initializes driver, checks if it's supported and
initializes connection to the storage.
|
func initDatabaseConnection ( configuration * StorageConfiguration ) ( * sql . DB , error ) {
|
check if storage configuration structure has been initialized and
passed to this function properly
|
if configuration == nil {
const message = "StorageConfiguration structure should be provided"
err := errors . New ( message )
log . Error ( ) . Msg ( message )
return nil , err
}
driverName := configuration . Driver
dataSource := ""
log . Info ( ) . Str ( "driverName" , configuration . Driver ) . Msg ( "DB connection configuration" )
|
initialize connection into selected database using the right driver
|
switch driverName {
case "sqlite3" :
dataSource = configuration . SQLiteDataSource
case "postgres" :
dataSource = fmt . Sprintf (
"postgresql://%v:%v@%v:%v/%v?%v" ,
configuration . PGUsername ,
configuration . PGPassword ,
configuration . PGHost ,
configuration . PGPort ,
configuration . PGDBName ,
configuration . PGParams ,
)
default :
err := fmt . Errorf ( "driver %v is not supported" , driverName )
log . Err ( err ) . Msg ( canNotConnectToDataStorageMessage )
return nil , err
}
|
try to initialize connection to the storage
|
connection , err := sql . Open ( driverName , dataSource )
|
check if establishing a connection was successful
|
if err != nil {
log . Err ( err ) . Msg ( canNotConnectToDataStorageMessage )
return nil , err
}
return connection , nil
}
|
displayMultipleRuleDisable function read and displays clusters where
multiple users have disabled some rules.
|
func displayMultipleRuleDisable ( connection * sql . DB , output string ) error {
var fout * os . File = nil
var writer * bufio . Writer = nil
if output != "" {
|
create output file
disable G304 (CWE-22): Potential file inclusion via variable (Confidence: HIGH, Severity: MEDIUM)
|
fout , err := os . Create ( output )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileOpenMsg )
}
|
an object used to write to file
|
writer = bufio . NewWriter ( fout )
}
defer func ( ) {
|
output needs to be flushed at the end
|
if writer != nil {
err := writer . Flush ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( flushWriterMsg )
}
}
} ( )
defer func ( ) {
|
file needs to be closed at the end
|
if fout != nil {
err := fout . Close ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileCloseMsg )
}
}
} ( )
|
first query to be performed
|
query1 := `
select cluster_id, rule_id, count(*) as cnt
from cluster_rule_toggle
group by cluster_id, rule_id
having count(*)>1
order by cnt desc;
`
|
second query to be performed
|
query2 := `
select cluster_id, rule_id, count(*) as cnt
from cluster_user_rule_disable_feedback
group by cluster_id, rule_id
having count(*)>1
order by cnt desc;
`
|
perform the first query and display results
|
err := performDisplayMultipleRuleDisable ( connection , writer , query1 ,
"cluster_rule_toggle" )
|
the first query+display function might throw some error
|
if err != nil {
return err
}
|
perform second query and display results
|
err = performDisplayMultipleRuleDisable ( connection , writer , query2 ,
"cluster_user_rule_disable_feedback" )
|
second query+display function might throw some error
|
return err
}
|
performDisplayMultipleRuleDisable function displays cluster names and org
ids where multiple users disabled any rule
|
func performDisplayMultipleRuleDisable ( connection * sql . DB ,
writer * bufio . Writer , query string , tableName string ) error {
|
perform given query to database
|
rows , err := connection . Query ( query )
if err != nil {
return err
}
|
iterate over all records that has been found
|
for rows . Next ( ) {
var (
clusterName string
ruleID string
count int
)
|
read one report
|
if err := rows . Scan ( & clusterName , & ruleID , & count ) ; err != nil {
|
close the result set in case of any error
|
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
}
return err
}
|
try to read organization ID for given cluster name
|
orgID , err := readOrgID ( connection , clusterName )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "readOrgID" )
return err
}
|
just print the report, including organization ID
|
log . Info ( ) . Str ( "table" , tableName ) .
Int ( "org ID" , orgID ) .
Str ( clusterNameMsg , clusterName ) .
Str ( "rule ID" , ruleID ) .
Int ( "count" , count ) .
Msg ( "Multiple rule disable" )
|
export to file (if enabled)
|
if writer != nil {
_ , err := fmt . Fprintf ( writer , "%d,%s,%s,%d\n" , orgID , clusterName , ruleID , count )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( writeToFileMsg )
}
}
}
return nil
}
|
readOrgID function tries to read organization ID for given cluster name
|
func readOrgID ( connection * sql . DB , clusterName string ) ( int , error ) {
query := "select org_id from report where cluster = $1"
|
perform the query
|
rows , err := connection . Query ( query , clusterName )
if err != nil {
log . Debug ( ) . Msg ( "query" )
return - 1 , err
}
|
and check the result (if any)
|
if rows . Next ( ) {
var orgID int
|
read one organization ID returned in query result
|
if err := rows . Scan ( & orgID ) ; err != nil {
|
proper error logging will be performed elsewhere
|
log . Debug ( ) . Str ( clusterNameMsg , clusterName ) . Msg ( "scan" )
|
close the result set in case of any error
|
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
}
return - 1 , err
}
return orgID , nil
}
|
no result?
|
log . Debug ( ) . Str ( clusterNameMsg , clusterName ) . Msg ( "no org_id for cluster" )
return - 1 , nil
}
func createOutputFile ( output string ) ( * os . File , * bufio . Writer ) {
var fout * os . File = nil
var writer * bufio . Writer = nil
if output != "" {
|
create output file
disable G304 (CWE-22): Potential file inclusion via variable (Confidence: HIGH, Severity: MEDIUM)
|
fout , err := os . Create ( output )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileOpenMsg )
}
|
an object used to write to file
|
writer = bufio . NewWriter ( fout )
}
return fout , writer
}
|
displayAllOldRecords function read all old records, ie. records that are
older than the specified time duration. Those records are simply displayed.
|
func displayAllOldRecords ( connection * sql . DB , maxAge , output string ) error {
|
check if connection has been initialized
|
if connection == nil {
log . Error ( ) . Msg ( connectionNotEstablished )
return errors . New ( connectionNotEstablished )
}
fout , writer := createOutputFile ( output )
defer func ( ) {
|
output needs to be flushed at the end
|
if writer != nil {
err := writer . Flush ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( flushWriterMsg )
}
}
} ( )
defer func ( ) {
|
file needs to be closed at the end
|
if fout != nil {
err := fout . Close ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( fileCloseMsg )
}
}
} ( )
|
main function of this tool is ability to delete old reports
|
err := performListOfOldReports ( connection , maxAge , writer )
|
skip next operation on first error
|
if err != nil {
return err
}
|
but we might be interested in other tables as well, especially advisor ratings
|
err = performListOfOldRatings ( connection , maxAge )
|
skip next operation on first error
|
if err != nil {
return err
}
|
also but we might be interested in other consumer errors
|
err = performListOfOldConsumerErrors ( connection , maxAge )
|
skip next operation on first error
|
if err != nil {
return err
}
return nil
}
func listOldDatabaseRecords ( connection * sql . DB , maxAge string ,
writer * bufio . Writer , query string ,
logEntry string , countLogEntry string ,
callback func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) ) error {
log . Info ( ) . Msg ( logEntry + " begin" )
rows , err := connection . Query ( query , maxAge )
if err != nil {
return err
}
count , err := callback ( rows , writer )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Query error" )
return err
}
log . Info ( ) . Int ( countLogEntry , count ) . Msg ( logEntry + " end" )
return nil
}
|
performListOfOldReports read and displays old records read from reported_at
table
|
func performListOfOldReports ( connection * sql . DB , maxAge string , writer * bufio . Writer ) error {
return listOldDatabaseRecords ( connection , maxAge , writer , selectOldReports , "List of old reports" , "reports count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
|
used to compute a real record age
|
now := time . Now ( )
|
reports count
|
count := 0
|
iterate over all old records
|
for rows . Next ( ) {
var (
clusterName string
reported time . Time
lastChecked time . Time
)
|
read one old record from the report table
|
if err := rows . Scan ( & clusterName , & reported , & lastChecked ) ; err != nil {
|
close the result set in case of any error
|
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
}
return count , err
}
|
compute the real record age
|
age := int ( math . Ceil ( now . Sub ( reported ) . Hours ( ) / 24 ) )
|
prepare for the report
|
reportedF := reported . Format ( time . RFC3339 )
lastCheckedF := lastChecked . Format ( time . RFC3339 )
|
just print the report
|
log . Info ( ) . Str ( clusterNameMsg , clusterName ) .
Str ( "reported" , reportedF ) .
Str ( "lastChecked" , lastCheckedF ) .
Int ( "age" , age ) .
Msg ( "Old report" )
if writer != nil {
_ , err := fmt . Fprintf ( writer , "%s,%s,%s,%d\n" , clusterName , reportedF , lastCheckedF , age )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( writeToFileMsg )
}
}
count ++
}
return count , nil
} )
}
|
performListOfOldRatings read and displays old Advisor ratings read from
advisor_ratings table
|
func performListOfOldRatings ( connection * sql . DB , maxAge string ) error {
return listOldDatabaseRecords ( connection , maxAge , nil , selectOldAdvisorRatings , "List of old Advisor ratings" , "ratings count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
|
used to compute a real record age
|
now := time . Now ( )
|
reports count
|
count := 0
|
iterate over all old records
|
for rows . Next ( ) {
var (
orgID string
ruleFQDN string
errorKey string
ruleID string
rating int
lastUpdatedAt time . Time
)
|
read one old record from the report table
|
if err := rows . Scan ( & orgID , & ruleFQDN , & errorKey , & ruleID , & rating , & lastUpdatedAt ) ; err != nil {
|
close the result set in case of any error
|
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
}
return count , err
}
|
compute the real error age
|
age := int ( math . Ceil ( now . Sub ( lastUpdatedAt ) . Hours ( ) / 24 ) )
|
prepare for the report
|
lastUpdatedAtF := lastUpdatedAt . Format ( time . RFC3339 )
|
just print the report
|
log . Info ( ) .
Str ( "organization" , orgID ) .
Str ( "rule FQDN" , ruleFQDN ) .
Str ( "error key" , errorKey ) .
Int ( "rating" , rating ) .
Str ( "updated at" , lastUpdatedAtF ) .
Int ( "rating age" , age ) .
Msg ( "Old Advisor rating" )
count ++
}
return count , nil
} )
}
|
performListOfOldConsumerErrors read and displays consumer errors stored in
consumer_errors table
|
func performListOfOldConsumerErrors ( connection * sql . DB , maxAge string ) error {
return listOldDatabaseRecords ( connection , maxAge , nil , selectOldConsumerErrors , "List of old consumer errors" , "errors count" ,
func ( rows * sql . Rows , writer * bufio . Writer ) ( int , error ) {
|
used to compute a real record age
|
now := time . Now ( )
|
reports count
|
count := 0
|
iterate over all old records
|
for rows . Next ( ) {
var (
topic string
partition int
offset int
key string
consumedAt time . Time
message string
)
|
read one old record from the report table
|
if err := rows . Scan ( & topic , & partition , & offset , & key , & consumedAt , & message ) ; err != nil {
|
close the result set in case of any error
|
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( unableToCloseDBRowsHandle )
}
return count , err
}
|
compute the real error age
|
age := int ( math . Ceil ( now . Sub ( consumedAt ) . Hours ( ) / 24 ) )
|
prepare for the report
|
consumedF := consumedAt . Format ( time . RFC3339 )
|
just print the report
|
log . Info ( ) .
Str ( "topic" , topic ) .
Int ( "partition" , partition ) .
Int ( "offset" , offset ) .
Str ( "key" , key ) .
Str ( "message" , message ) .
Str ( "consumed" , consumedF ) .
Int ( "error age" , age ) .
Msg ( "Old consumer error" )
count ++
}
return count , nil
} )
}
|
deleteRecordFromTable function deletes selected records (identified by
cluster name) from database
|
func deleteRecordFromTable ( connection * sql . DB , table , key string , clusterName ClusterName ) ( int , error ) {
|
it is not possible to use parameter for table name or a key
disable "G202 (CWE-89): SQL string concatenation (Confidence: HIGH, Severity: MEDIUM)"
nosec G202
|
sqlStatement := "DELETE FROM " + table + " WHERE " + key + " = $1;"
|
perform the SQL statement
|
result , err := connection . Exec ( sqlStatement , clusterName )
if err != nil {
return 0 , err
}
|
read number of affected (deleted) rows
|
affected , err := result . RowsAffected ( )
if err != nil {
return 0 , err
}
return int ( affected ) , nil
}
|
tablesAndKeys contains list of all tables together with keys used to select
records to be deleted
|
var tablesAndKeys = [ ... ] TableAndKey {
{
TableName : "cluster_rule_toggle" ,
KeyName : "cluster_id" ,
} ,
{
TableName : "cluster_rule_user_feedback" ,
KeyName : "cluster_id" ,
} ,
{
TableName : "cluster_user_rule_disable_feedback" ,
KeyName : "cluster_id" ,
} ,
{
TableName : "rule_hit" ,
KeyName : "cluster_id" ,
} ,
{
TableName : "recommendation" ,
KeyName : "cluster_id" ,
} ,
{
TableName : "report_info" ,
KeyName : "cluster_id" ,
} ,
|
must be at the end due to constraints
|
{
TableName : "report" ,
KeyName : "cluster" ,
} ,
}
|
performVacuumDB vacuums the whole database
|
func performVacuumDB ( connection * sql . DB ) error {
log . Info ( ) . Msg ( "Vacuuming started" )
sqlStatement := "VACUUM VERBOSE;"
|
perform the SQL statement
|
_ , err := connection . Exec ( sqlStatement )
if err != nil {
return err
}
log . Info ( ) . Msg ( "Vacuuming finished" )
return nil
}
|
performCleanupInDB function cleans up all data for selected cluster names
|
func performCleanupInDB ( connection * sql . DB ,
clusterList ClusterList ) ( map [ string ] int , error ) {
|
return value
|
deletionsForTable := make ( map [ string ] int )
|
check if connection has been initialized
|
if connection == nil {
log . Error ( ) . Msg ( connectionNotEstablished )
return deletionsForTable , errors . New ( connectionNotEstablished )
}
|
initialize counters
|
for _ , tableAndKey := range tablesAndKeys {
deletionsForTable [ tableAndKey . TableName ] = 0
}
|
perform cleanup for selected cluster names
|
log . Info ( ) . Msg ( "Cleanup started" )
for _ , clusterName := range clusterList {
for _ , tableAndKey := range tablesAndKeys {
|
try to delete record from selected table
|
affected , err := deleteRecordFromTable ( connection ,
tableAndKey . TableName ,
tableAndKey . KeyName ,
clusterName )
if err != nil {
log . Error ( ) .
Err ( err ) .
Str ( tableName , tableAndKey . TableName ) .
Msg ( "Unable to delete record" )
} else {
log . Info ( ) .
Int ( "Affected" , affected ) .
Str ( tableName , tableAndKey . TableName ) .
Str ( clusterNameMsg , string ( clusterName ) ) .
Msg ( "Delete record" )
deletionsForTable [ tableAndKey . TableName ] += affected
}
}
}
log . Info ( ) . Msg ( "Cleanup finished" )
return deletionsForTable , nil
}
|
fillInDatabaseByTestData function fill-in database by test data (not to be
used against production database)
|
func fillInDatabaseByTestData ( connection * sql . DB ) error {
log . Info ( ) . Msg ( "Fill-in database started" )
var lastError error = nil
clusterNames := [ ... ] string {
"00000000-0000-0000-0000-000000000000" ,
"11111111-1111-1111-1111-111111111111" ,
"5d5892d4-1f74-4ccf-91af-548dfc9767aa" }
sqlStatements := [ ... ] string {
"INSERT INTO report (org_id, cluster, report, reported_at, last_checked_at, kafka_offset) values(1, $1, '', '2021-01-01', '2021-01-01', 10)" ,
"INSERT INTO cluster_rule_toggle (cluster_id, rule_id, user_id, disabled, disabled_at, enabled_at, updated_at) values($1, 1, 1, 0, '2021-01-01', '2021-01-01', '2021-01-01')" ,
"INSERT INTO cluster_rule_user_feedback (cluster_id, rule_id, user_id, message, user_vote, added_at, updated_at) values($1, 1, 1, 'foobar', 1, '2021-01-01', '2021-01-01')" ,
"INSERT INTO cluster_user_rule_disable_feedback (cluster_id, user_id, rule_id, message, added_at, updated_at) values($1, 1, 1, 'foobar', '2021-01-01', '2021-01-01')" ,
"INSERT INTO rule_hit (org_id, cluster_id, rule_fqdn, error_key, template_data) values(1, $1, 'foo', 'bar', '')" ,
}
for _ , clusterName := range clusterNames {
log . Info ( ) .
Str ( "cluster name" , clusterName ) .
Msg ( "data for new cluster" )
for _ , sqlStatement := range sqlStatements {
log . Info ( ) .
Str ( "SQL statement" , sqlStatement ) .
Msg ( "inserting" )
|
perform the SQL statement
|
_ , err := connection . Exec ( sqlStatement , clusterName )
if err != nil {
|
failure is usually ok - it might mean that
the record with given cluster name already
exists
|
log . Err ( err ) . Msg ( "Insert error" )
lastError = err
}
}
}
log . Info ( ) . Msg ( "Fill-in database finished" )
return lastError
}
|