|
|
Package storage contains an implementation of interface between Go code and
(almost any) SQL database like PostgreSQL or MariaDB. An implementation
named DBStorage is constructed using the function 'New' and it is mandatory to
call 'Close' for any opened connection to the database. The storage might be
initialized by 'Init' method if database schema is empty.
It is possible to configure connection to selected database by using Configuration
structure. Currently, that structure contains two configurable parameter:
Driver - a SQL driver, like "pq", "pgx", etc.
DataSource - specification of data source. The content of this parameter depends on the database used.
|
package storage
import (
"database/sql"
sql_driver "database/sql/driver"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
"github.com/RedHatInsights/insights-operator-utils/redis"
ctypes "github.com/RedHatInsights/insights-results-types"
"github.com/RedHatInsights/insights-results-aggregator/metrics"
"github.com/RedHatInsights/insights-results-aggregator/migration"
"github.com/RedHatInsights/insights-results-aggregator/migration/ocpmigrations"
"github.com/RedHatInsights/insights-results-aggregator/types"
)
|
OCPRecommendationsStorage represents an interface to almost any database or storage system
|
type OCPRecommendationsStorage interface {
Storage
ListOfOrgs ( ) ( [ ] types . OrgID , error )
ListOfClustersForOrg (
orgID types . OrgID , timeLimit time . Time ) ( [ ] types . ClusterName , error ,
)
ListOfClustersForOrgSpecificRule (
orgID types . OrgID , ruleID types . RuleSelector , activeClusters [ ] string ,
) ( [ ] ctypes . HittingClustersData , error )
ReadReportForCluster (
orgID types . OrgID , clusterName types . ClusterName ) (
[ ] types . RuleOnReport , types . Timestamp , types . Timestamp , types . Timestamp , error ,
)
ReadReportInfoForCluster (
types . OrgID , types . ClusterName ) (
types . Version , error ,
)
ReadClusterVersionsForClusterList (
types . OrgID , [ ] string ,
) ( map [ types . ClusterName ] types . Version , error )
ReadReportsForClusters (
clusterNames [ ] types . ClusterName ) ( map [ types . ClusterName ] types . ClusterReport , error )
ReadOrgIDsForClusters (
clusterNames [ ] types . ClusterName ) ( [ ] types . OrgID , error )
ReadSingleRuleTemplateData (
orgID types . OrgID , clusterName types . ClusterName , ruleID types . RuleID , errorKey types . ErrorKey ,
) ( interface { } , error )
ReadReportForClusterByClusterName ( clusterName types . ClusterName ) ( [ ] types . RuleOnReport , types . Timestamp , error )
WriteReportForCluster (
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ClusterReport ,
rules [ ] types . ReportItem ,
collectedAtTime time . Time ,
gatheredAtTime time . Time ,
storedAtTime time . Time ,
requestID types . RequestID ,
) error
WriteReportInfoForCluster (
types . OrgID ,
types . ClusterName ,
[ ] types . InfoItem ,
time . Time ,
) error
WriteRecommendationsForCluster (
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ClusterReport ,
creationTime types . Timestamp ,
) error
ReportsCount ( ) ( int , error )
VoteOnRule (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
orgID types . OrgID ,
userID types . UserID ,
userVote types . UserVote ,
voteMessage string ,
) error
AddOrUpdateFeedbackOnRule (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
orgID types . OrgID ,
userID types . UserID ,
message string ,
) error
AddFeedbackOnRuleDisable (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
orgID types . OrgID ,
userID types . UserID ,
message string ,
) error
GetUserFeedbackOnRule (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
userID types . UserID ,
) ( * UserFeedbackOnRule , error )
GetUserFeedbackOnRuleDisable (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
userID types . UserID ,
) ( * UserFeedbackOnRule , error )
DeleteReportsForOrg ( orgID types . OrgID ) error
DeleteReportsForCluster ( clusterName types . ClusterName ) error
ToggleRuleForCluster (
clusterID types . ClusterName ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
orgID types . OrgID ,
ruleToggle RuleToggle ,
) error
GetFromClusterRuleToggle (
types . ClusterName ,
types . RuleID ,
) ( * ClusterRuleToggle , error )
GetTogglesForRules (
clusterID types . ClusterName ,
rulesReport [ ] types . RuleOnReport ,
orgID types . OrgID ,
) ( map [ types . RuleID ] bool , error )
DeleteFromRuleClusterToggle (
clusterID types . ClusterName ,
ruleID types . RuleID ,
) error
GetOrgIDByClusterID ( cluster types . ClusterName ) ( types . OrgID , error )
WriteConsumerError ( msg * sarama . ConsumerMessage , consumerErr error ) error
GetUserFeedbackOnRules (
clusterID types . ClusterName ,
rulesReport [ ] types . RuleOnReport ,
userID types . UserID ,
) ( map [ types . RuleID ] types . UserVote , error )
GetUserDisableFeedbackOnRules (
clusterID types . ClusterName ,
rulesReport [ ] types . RuleOnReport ,
userID types . UserID ,
) ( map [ types . RuleID ] UserFeedbackOnRule , error )
DoesClusterExist ( clusterID types . ClusterName ) ( bool , error )
ListOfDisabledRules ( orgID types . OrgID ) ( [ ] ctypes . DisabledRule , error )
ListOfReasons ( userID types . UserID ) ( [ ] DisabledRuleReason , error )
ListOfDisabledRulesForClusters (
clusterList [ ] string ,
orgID types . OrgID ,
) ( [ ] ctypes . DisabledRule , error )
ListOfDisabledClusters (
orgID types . OrgID ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
) ( [ ] ctypes . DisabledClusterInfo , error )
RateOnRule (
types . OrgID ,
types . RuleID ,
types . ErrorKey ,
types . UserVote ,
) error
GetRuleRating (
types . OrgID ,
types . RuleSelector ,
) ( types . RuleRating , error )
DisableRuleSystemWide (
orgID types . OrgID , ruleID types . RuleID ,
errorKey types . ErrorKey , justification string ,
) error
EnableRuleSystemWide (
orgID types . OrgID ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
) error
UpdateDisabledRuleJustification (
orgID types . OrgID ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
justification string ,
) error
ReadDisabledRule (
orgID types . OrgID , ruleID types . RuleID , errorKey types . ErrorKey ,
) ( ctypes . SystemWideRuleDisable , bool , error )
ListOfSystemWideDisabledRules (
orgID types . OrgID ,
) ( [ ] ctypes . SystemWideRuleDisable , error )
ReadRecommendationsForClusters ( [ ] string , types . OrgID ) ( ctypes . RecommendationImpactedClusters , error )
ReadClusterListRecommendations ( clusterList [ ] string , orgID types . OrgID ) (
ctypes . ClusterRecommendationMap , error ,
)
PrintRuleDisableDebugInfo ( )
}
const (
|
ReportSuffix is used to strip away .report suffix from rule module names
|
ReportSuffix = ".report"
|
ocpDBSchema uses the default public schema in all queries/migrations and all environments
|
ocpDBSchema = "public"
|
Query for getting creation timestamp in rule_hit table for given Org + Cluster
|
selectRuleCreatedAtQuery = `SELECT rule_fqdn, error_key, created_at FROM rule_hit WHERE org_id = $1 AND cluster_id = $2;`
|
Query for getting creation timestamp in recommendation table for impacted_since Org + Cluster
|
selectRuleImpactedSinceQuery = `SELECT rule_fqdn, error_key, impacted_since FROM recommendation WHERE org_id = $1 AND cluster_id = $2;`
)
|
OCPRecommendationsDBStorage is an implementation of Storage interface that use selected SQL like database
like PostgreSQL, MariaDB, RDS etc. That implementation is based on the standard
sql package. It is possible to configure connection via Configuration structure.
SQLQueriesLog is log for sql queries, default is nil which means nothing is logged
|
type OCPRecommendationsDBStorage struct {
connection * sql . DB
dbDriverType types . DBDriver
|
clusterLastCheckedDict is a dictionary of timestamps when the clusters were last checked.
|
clustersLastChecked map [ types . ClusterName ] time . Time
}
|
NewOCPRecommendationsStorage function creates and initializes a new instance of Storage interface
|
func NewOCPRecommendationsStorage ( configuration Configuration ) ( OCPRecommendationsStorage , error ) {
switch configuration . Type {
case types . SQLStorage :
log . Info ( ) . Str ( "OCP storage type" , configuration . Type ) . Send ( )
return newSQLStorage ( configuration )
case types . RedisStorage :
log . Info ( ) . Str ( "Redis storage type" , configuration . Type ) . Send ( )
return newRedisStorage ( configuration )
case types . NoopStorage :
return newNoopOCPStorage ( configuration )
default :
|
error to be thrown
|
err := fmt . Errorf ( "Unknown storage type '%s'" , configuration . Type )
log . Error ( ) . Err ( err ) . Msg ( "Init failure" )
return nil , err
}
}
|
newNoopOCPStorage function creates and initializes a new instance of Noop storage
|
func newNoopOCPStorage ( _ Configuration ) ( OCPRecommendationsStorage , error ) {
return & NoopOCPStorage { } , nil
}
|
newRedisStorage function creates and initializes a new instance of Redis storage
|
func newRedisStorage ( configuration Configuration ) ( OCPRecommendationsStorage , error ) {
redisCfg := configuration . RedisConfiguration
log . Info ( ) .
Str ( "Endpoint" , redisCfg . RedisEndpoint ) .
Int ( "Database index" , redisCfg . RedisDatabase ) .
Msg ( "Making connection to Redis storage" )
|
pass for unit tests
|
if redisCfg . RedisEndpoint == "" {
return & RedisStorage { } , nil
}
client , err := redis . CreateRedisClient (
redisCfg . RedisEndpoint ,
redisCfg . RedisDatabase ,
redisCfg . RedisPassword ,
redisCfg . RedisTimeoutSeconds ,
)
|
check for init error
|
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Error constructing Redis client" )
return nil , err
}
log . Info ( ) . Msg ( "Redis client has been initialized" )
redisStorage := & RedisStorage {
Client : redis . Client { Connection : client } ,
}
err = redisStorage . Init ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Error initializing Redis client" )
return nil , err
}
return redisStorage , nil
}
|
newSQLStorage function creates and initializes a new instance of DB storage
|
func newSQLStorage ( configuration Configuration ) ( OCPRecommendationsStorage , error ) {
driverType , driverName , dataSource , err := initAndGetDriver ( configuration )
if err != nil {
return nil , err
}
log . Info ( ) . Msgf (
"Making connection to data storage, driver=%s" ,
driverName ,
)
connection , err := sql . Open ( driverName , dataSource )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Cannot connect to data storage" )
return nil , err
}
return NewOCPRecommendationsFromConnection ( connection , driverType ) , nil
}
|
NewOCPRecommendationsFromConnection function creates and initializes a new instance of Storage interface from prepared connection
|
func NewOCPRecommendationsFromConnection ( connection * sql . DB , dbDriverType types . DBDriver ) * OCPRecommendationsDBStorage {
return & OCPRecommendationsDBStorage {
connection : connection ,
dbDriverType : dbDriverType ,
clustersLastChecked : map [ types . ClusterName ] time . Time { } ,
}
}
|
initAndGetDriver initializes driver(with logs if logSQLQueries is true),
checks if it's supported and returns driver type, driver name, dataSource and error
|
func initAndGetDriver ( configuration Configuration ) ( driverType types . DBDriver , driverName , dataSource string , err error ) {
var driver sql_driver . Driver
driverName = configuration . Driver
switch driverName {
case "postgres" :
driverType = types . DBDriverPostgres
driver = & pq . Driver { }
dataSource = fmt . Sprintf (
"postgresql://%v:%v@%v:%v/%v?%v" ,
configuration . PGUsername ,
configuration . PGPassword ,
configuration . PGHost ,
configuration . PGPort ,
configuration . PGDBName ,
configuration . PGParams ,
)
default :
err = fmt . Errorf ( "driver %v is not supported" , driverName )
return
}
if configuration . LogSQLQueries {
driverName = InitSQLDriverWithLogs ( driver , driverName )
}
return
}
|
GetMigrations returns a list of database migrations related to OCP recommendation tables
|
func ( storage OCPRecommendationsDBStorage ) GetMigrations ( ) [ ] migration . Migration {
return ocpmigrations . UsableOCPMigrations
}
|
GetDBSchema returns the schema name to be used in queries
|
func ( storage OCPRecommendationsDBStorage ) GetDBSchema ( ) migration . Schema {
return migration . Schema ( ocpDBSchema )
}
|
GetMaxVersion returns the highest available migration version.
The DB version cannot be set to a value higher than this.
This value is equivalent to the length of the list of available migrations.
|
func ( storage OCPRecommendationsDBStorage ) GetMaxVersion ( ) migration . Version {
return migration . Version ( len ( storage . GetMigrations ( ) ) )
}
|
MigrateToLatest migrates the database to the latest available
migration version. This must be done before an Init() call.
|
func ( storage OCPRecommendationsDBStorage ) MigrateToLatest ( ) error {
dbConn , dbSchema := storage . GetConnection ( ) , storage . GetDBSchema ( )
if err := migration . InitInfoTable ( dbConn , dbSchema ) ; err != nil {
return err
}
return migration . SetDBVersion (
dbConn ,
storage . dbDriverType ,
dbSchema ,
storage . GetMaxVersion ( ) ,
storage . GetMigrations ( ) ,
)
}
|
Init performs all database initialization
tasks necessary for further service operation.
|
func ( storage OCPRecommendationsDBStorage ) Init ( ) error {
|
Read clusterName:LastChecked dictionary from DB.
|
rows , err := storage . connection . Query ( "SELECT cluster, last_checked_at FROM report;" )
if err != nil {
return err
}
log . Debug ( ) . Msg ( "executing last_checked_at query" )
for rows . Next ( ) {
var (
clusterName types . ClusterName
lastChecked sql . NullTime
)
if err := rows . Scan ( & clusterName , & lastChecked ) ; err != nil {
if closeErr := rows . Close ( ) ; closeErr != nil {
log . Error ( ) . Err ( closeErr ) . Msg ( "Unable to close the DB rows handle" )
}
return err
}
storage . clustersLastChecked [ clusterName ] = lastChecked . Time
}
|
Not using defer to close the rows here to:
- make errcheck happy (it doesn't like ignoring returned errors),
- return a possible error returned by the Close method.
|
return rows . Close ( )
}
|
Close method closes the connection to database. Needs to be called at the end of application lifecycle.
|
func ( storage OCPRecommendationsDBStorage ) Close ( ) error {
log . Info ( ) . Msg ( "Closing connection to data storage" )
if storage . connection != nil {
err := storage . connection . Close ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Can not close connection to data storage" )
return err
}
}
return nil
}
|
Report represents one (latest) cluster report.
Org: organization ID
Name: cluster GUID in the following format:
c8590f31-e97e-4b85-b506-c45ce1911a12
|
type Report struct {
Org types . OrgID `json:"org"`
Name types . ClusterName `json:"cluster"`
Report types . ClusterReport `json:"report"`
ReportedAt types . Timestamp `json:"reported_at"`
}
func closeRows ( rows * sql . Rows ) {
_ = rows . Close ( )
}
|
ListOfOrgs reads list of all organizations that have at least one cluster report
|
func ( storage OCPRecommendationsDBStorage ) ListOfOrgs ( ) ( [ ] types . OrgID , error ) {
orgs := make ( [ ] types . OrgID , 0 )
rows , err := storage . connection . Query ( "SELECT DISTINCT org_id FROM report ORDER BY org_id;" )
err = types . ConvertDBError ( err , nil )
if err != nil {
return orgs , err
}
defer closeRows ( rows )
for rows . Next ( ) {
var orgID types . OrgID
err = rows . Scan ( & orgID )
if err == nil {
orgs = append ( orgs , orgID )
} else {
log . Error ( ) . Err ( err ) . Msg ( "ListOfOrgID" )
}
}
return orgs , nil
}
|
ListOfClustersForOrg reads list of all clusters fro given organization
|
func ( storage OCPRecommendationsDBStorage ) ListOfClustersForOrg ( orgID types . OrgID , timeLimit time . Time ) ( [ ] types . ClusterName , error ) {
clusters := make ( [ ] types . ClusterName , 0 )
q := `
SELECT cluster
FROM report
WHERE org_id = $1
AND reported_at >= $2
ORDER BY cluster;
`
rows , err := storage . connection . Query ( q , orgID , timeLimit )
err = types . ConvertDBError ( err , orgID )
if err != nil {
return clusters , err
}
defer closeRows ( rows )
for rows . Next ( ) {
var clusterName string
err = rows . Scan ( & clusterName )
if err == nil {
clusters = append ( clusters , types . ClusterName ( clusterName ) )
} else {
log . Error ( ) . Err ( err ) . Msg ( "ListOfClustersForOrg" )
}
}
return clusters , nil
}
|
ListOfClustersForOrgSpecificRule returns list of all clusters for given organization that are affect by given rule
|
func ( storage OCPRecommendationsDBStorage ) ListOfClustersForOrgSpecificRule (
orgID types . OrgID ,
ruleID types . RuleSelector ,
activeClusters [ ] string ) (
[ ] ctypes . HittingClustersData , error ) {
results := make ( [ ] ctypes . HittingClustersData , 0 )
var whereClause string
if len ( activeClusters ) > 0 {
|
nosec G201
|
whereClause = fmt . Sprintf ( `WHERE org_id = $1 AND rule_id = $2 AND cluster_id IN (%v)` ,
inClauseFromSlice ( activeClusters ) )
} else {
whereClause = `WHERE org_id = $1 AND rule_id = $2`
}
|
nosec G202
|
query := `SELECT cluster_id, created_at, impacted_since FROM recommendation ` + whereClause + ` ORDER BY cluster_id;`
|
nosec G202
|
rows , err := storage . connection . Query ( query , orgID , ruleID )
err = types . ConvertDBError ( err , orgID )
if err != nil {
return results , err
}
defer closeRows ( rows )
var (
clusterName types . ClusterName
lastSeen string
impactedSince string
)
for rows . Next ( ) {
err = rows . Scan ( & clusterName , & lastSeen , & impactedSince )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "ListOfClustersForOrgSpecificRule" )
}
results = append ( results , ctypes . HittingClustersData {
Cluster : clusterName ,
LastSeen : lastSeen ,
ImpactedSince : impactedSince ,
} )
}
|
This is to ensure 404 when no recommendation is found for the given orgId + selector.
We can, alternatively, return something like this with a 204 (no content):
{"data":[],"meta":{"count":0,"component":"test.rule","errorkey":"ek"},"status":"notfound"}
|
if len ( results ) == 0 {
return results , & types . ItemNotFoundError { ItemID : ruleID }
}
return results , nil
}
|
GetOrgIDByClusterID reads OrgID for specified cluster
|
func ( storage OCPRecommendationsDBStorage ) GetOrgIDByClusterID ( cluster types . ClusterName ) ( types . OrgID , error ) {
row := storage . connection . QueryRow ( "SELECT org_id FROM report WHERE cluster = $1 ORDER BY org_id;" , cluster )
var orgID uint32
err := row . Scan ( & orgID )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "GetOrgIDByClusterID" )
return 0 , err
}
return types . OrgID ( orgID ) , nil
}
|
parseTemplateData parses template data and returns a json raw message if it's a json or a string otherwise
|
func parseTemplateData ( templateData [ ] byte ) interface { } {
var templateDataJSON json . RawMessage
err := json . Unmarshal ( templateData , & templateDataJSON )
if err != nil {
log . Warn ( ) . Err ( err ) . Msgf ( "unable to parse template data as json" )
return templateData
}
return templateDataJSON
}
func parseRuleRows ( rows * sql . Rows ) ( [ ] types . RuleOnReport , error ) {
report := make ( [ ] types . RuleOnReport , 0 )
for rows . Next ( ) {
var (
templateDataBytes [ ] byte
ruleFQDN types . RuleID
errorKey types . ErrorKey
createdAt sql . NullTime
)
err := rows . Scan ( & templateDataBytes , & ruleFQDN , & errorKey , & createdAt )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "ReportListForCluster" )
return report , err
}
templateData := parseTemplateData ( templateDataBytes )
var createdAtConverted time . Time
if createdAt . Valid {
createdAtConverted = createdAt . Time
}
rule := types . RuleOnReport {
Module : ruleFQDN ,
ErrorKey : errorKey ,
TemplateData : templateData ,
CreatedAt : types . Timestamp ( createdAtConverted . UTC ( ) . Format ( time . RFC3339 ) ) ,
}
report = append ( report , rule )
}
return report , nil
}
|
constructInClausule is a helper function to construct in clause for SQL
statement.
|
func constructInClausule ( howMany int ) ( string , error ) {
|
construct the in clause in SQL query statement
|
if howMany < 1 {
return "" , fmt . Errorf ( "at least one value needed" )
}
inClausule := "$1"
for i := 2 ; i <= howMany ; i ++ {
inClausule += fmt . Sprintf ( ",$%d" , i )
}
return inClausule , nil
}
|
argsWithClusterNames is a helper function to construct arguments for SQL
statement.
|
func argsWithClusterNames ( clusterNames [ ] types . ClusterName ) [ ] interface { } {
|
prepare arguments
|
args := make ( [ ] interface { } , len ( clusterNames ) )
for i , clusterName := range clusterNames {
args [ i ] = clusterName
}
return args
}
|
inClauseFromSlice is a helper function to construct in clause for SQL
statement from a given slice of items. The received slice must be []string
or any other type that can be asserted to []string, or else '1=1' will be
returned, making the IN clause act like a wildcard.
|
func inClauseFromSlice ( slice interface { } ) string {
if slice , ok := slice . ( [ ] string ) ; ok {
return "'" + strings . Join ( slice , `','` ) + `'`
}
return "1=1"
}
|
ReadOrgIDsForClusters read organization IDs for given list of cluster names.
|
func ( storage OCPRecommendationsDBStorage ) ReadOrgIDsForClusters ( clusterNames [ ] types . ClusterName ) ( [ ] types . OrgID , error ) {
|
stub for return value
|
ids := make ( [ ] types . OrgID , 0 )
if len ( clusterNames ) < 1 {
return ids , nil
}
|
prepare arguments
|
args := argsWithClusterNames ( clusterNames )
|
construct the in clause in SQL query statement
|
inClausule , err := constructInClausule ( len ( clusterNames ) )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( inClauseError )
return ids , err
}
|
disable "G202 (CWE-89): SQL string concatenation"
nosec G202
|
query := "SELECT DISTINCT org_id FROM report WHERE cluster in (" + inClausule + ");"
|
select results from the database
nosec G202
|
rows , err := storage . connection . Query ( query , args ... )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "query to get org ids" )
return ids , err
}
|
process results returned from database
|
for rows . Next ( ) {
var orgID types . OrgID
err := rows . Scan ( & orgID )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "read one org id" )
return ids , err
}
ids = append ( ids , orgID )
}
|
everything seems ok -> return ids
|
return ids , nil
}
|
ReadReportsForClusters function reads reports for given list of cluster
names.
|
func ( storage OCPRecommendationsDBStorage ) ReadReportsForClusters ( clusterNames [ ] types . ClusterName ) ( map [ types . ClusterName ] types . ClusterReport , error ) {
|
stub for return value
|
reports := make ( map [ types . ClusterName ] types . ClusterReport )
if len ( clusterNames ) < 1 {
return reports , nil
}
|
prepare arguments
|
args := argsWithClusterNames ( clusterNames )
|
construct the in clause in SQL query statement
|
inClausule , err := constructInClausule ( len ( clusterNames ) )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( inClauseError )
return reports , err
}
|
disable "G202 (CWE-89): SQL string concatenation"
nosec G202
|
query := "SELECT cluster, report FROM report WHERE cluster in (" + inClausule + ");"
|
select results from the database
nosec G202
|
rows , err := storage . connection . Query ( query , args ... )
if err != nil {
return reports , err
}
|
process results returned from database
|
for rows . Next ( ) {
|
convert into requested type
|
var (
clusterName types . ClusterName
clusterReport types . ClusterReport
)
err := rows . Scan ( & clusterName , & clusterReport )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "ReadReportsForClusters" )
return reports , err
}
reports [ clusterName ] = clusterReport
}
|
everything seems ok -> return reports
|
return reports , nil
}
|
ReadReportForCluster reads result (health status) for selected cluster
|
func ( storage OCPRecommendationsDBStorage ) ReadReportForCluster (
orgID types . OrgID , clusterName types . ClusterName ,
) ( [ ] types . RuleOnReport , types . Timestamp , types . Timestamp , types . Timestamp , error ) {
var lastChecked time . Time
var reportedAt time . Time
var gatheredAtInDB sql . NullTime
report := make ( [ ] types . RuleOnReport , 0 )
err := storage . connection . QueryRow (
"SELECT last_checked_at, reported_at, gathered_at FROM report WHERE org_id = $1 AND cluster = $2;" ,
orgID , clusterName ,
) . Scan ( & lastChecked , & reportedAt , & gatheredAtInDB )
|
convert timestamps to string
|
var lastCheckedStr = types . Timestamp ( lastChecked . UTC ( ) . Format ( time . RFC3339 ) )
var reportedAtStr = types . Timestamp ( reportedAt . UTC ( ) . Format ( time . RFC3339 ) )
var gatheredAtStr types . Timestamp
if gatheredAtInDB . Valid {
gatheredAtStr = types . Timestamp ( gatheredAtInDB . Time . UTC ( ) . Format ( time . RFC3339 ) )
} else {
gatheredAtStr = ""
}
err = types . ConvertDBError ( err , [ ] interface { } { orgID , clusterName } )
if err != nil {
log . Warn ( ) . Err ( err ) . Str ( clusterKey , string ( clusterName ) ) . Msg (
"ReadReportForCluster query from report table error" ,
)
return report , lastCheckedStr , reportedAtStr , gatheredAtStr , err
}
rows , err := storage . connection . Query (
"SELECT template_data, rule_fqdn, error_key, created_at FROM rule_hit WHERE org_id = $1 AND cluster_id = $2;" , orgID , clusterName ,
)
err = types . ConvertDBError ( err , [ ] interface { } { orgID , clusterName } )
if err != nil {
log . Error ( ) . Err ( err ) . Str ( clusterKey , string ( clusterName ) ) . Msg (
"ReadReportForCluster query from rule_hit table error" ,
)
return report , lastCheckedStr , reportedAtStr , gatheredAtStr , err
}
report , err = parseRuleRows ( rows )
return report , lastCheckedStr , reportedAtStr , gatheredAtStr , err
}
|
ReadSingleRuleTemplateData reads template data for a single rule
|
func ( storage OCPRecommendationsDBStorage ) ReadSingleRuleTemplateData (
orgID types . OrgID , clusterName types . ClusterName , ruleID types . RuleID , errorKey types . ErrorKey ,
) ( interface { } , error ) {
var templateDataBytes [ ] byte
err := storage . connection . QueryRow ( `
SELECT template_data FROM rule_hit
WHERE org_id = $1 AND cluster_id = $2 AND rule_fqdn = $3 AND error_key = $4;
` ,
orgID ,
clusterName ,
ruleID ,
errorKey ,
) . Scan ( & templateDataBytes )
err = types . ConvertDBError ( err , [ ] interface { } { orgID , clusterName , ruleID , errorKey } )
return parseTemplateData ( templateDataBytes ) , err
}
|
ReadReportForClusterByClusterName reads result (health status) for selected cluster for given organization
|
func ( storage OCPRecommendationsDBStorage ) ReadReportForClusterByClusterName (
clusterName types . ClusterName ,
) ( [ ] types . RuleOnReport , types . Timestamp , error ) {
report := make ( [ ] types . RuleOnReport , 0 )
var lastChecked time . Time
err := storage . connection . QueryRow (
"SELECT last_checked_at FROM report WHERE cluster = $1;" , clusterName ,
) . Scan ( & lastChecked )
switch {
case err == sql . ErrNoRows :
return report , "" , & types . ItemNotFoundError {
ItemID : fmt . Sprintf ( "%v" , clusterName ) ,
}
case err != nil :
return report , "" , err
}
rows , err := storage . connection . Query (
"SELECT template_data, rule_fqdn, error_key, created_at FROM rule_hit WHERE cluster_id = $1;" , clusterName ,
)
if err != nil {
return report , types . Timestamp ( lastChecked . UTC ( ) . Format ( time . RFC3339 ) ) , err
}
report , err = parseRuleRows ( rows )
return report , types . Timestamp ( lastChecked . UTC ( ) . Format ( time . RFC3339 ) ) , err
}
|
GetRuleHitInsertStatement method prepares DB statement to be used to write
rule FQDN + rule error key into rulehit table for given clusterid
|
func ( storage OCPRecommendationsDBStorage ) GetRuleHitInsertStatement ( rules [ ] types . ReportItem ) string {
const ruleInsertStatement = "INSERT INTO rule_hit(org_id, cluster_id, rule_fqdn, error_key, template_data, created_at) VALUES %s"
|
pre-allocate array for placeholders
|
placeholders := make ( [ ] string , len ( rules ) )
|
fill-in placeholders for INSERT statement
|
for index := range rules {
placeholders [ index ] = fmt . Sprintf ( "($%d,$%d,$%d,$%d,$%d,$%d)" ,
index * 6 + 1 ,
index * 6 + 2 ,
index * 6 + 3 ,
index * 6 + 4 ,
index * 6 + 5 ,
index * 6 + 6 ,
)
}
|
construct INSERT statement for multiple values
|
return fmt . Sprintf ( ruleInsertStatement , strings . Join ( placeholders , "," ) )
}
func ( storage OCPRecommendationsDBStorage ) getRuleKeyCreatedAtMapForTable ( table string , orgID types . OrgID , clusterName types . ClusterName ) (
RuleKeyCreatedAt map [ string ] types . Timestamp ,
err error ) {
|
Switch case to select the appropriate query based on the table name
|
switch table {
case "rule_hit" :
RuleKeyCreatedAt , err = storage . getRuleKeyCreatedAtMap (
selectRuleCreatedAtQuery , orgID , clusterName ,
)
case "recommendation" :
RuleKeyCreatedAt , err = storage . getRuleKeyCreatedAtMap (
selectRuleImpactedSinceQuery , orgID , clusterName ,
)
default :
log . Error ( ) . Err ( err ) . Str ( "tableName" , table ) . Msg ( "Unexpected table to get ruleCreatedAtMap" )
return
}
if err != nil {
log . Error ( ) . Err ( err ) . Str ( "tableName" , table ) . Msg ( "Unable to generate ruleCreatedAtMap" )
return
}
return
}
|
valuesForRuleHitsInsert function prepares values to insert rules into
rule_hit table.
|
func valuesForRuleHitsInsert (
orgID types . OrgID ,
clusterName types . ClusterName ,
rules [ ] types . ReportItem ,
ruleKeyCreatedAt map [ string ] types . Timestamp ,
) [ ] interface { } {
|
fill-in values for INSERT statement
|
values := make ( [ ] interface { } , len ( rules ) * 6 )
for index , rule := range rules {
ruleKey := string ( rule . Module ) + string ( rule . ErrorKey )
var impactedSince types . Timestamp
if val , ok := ruleKeyCreatedAt [ ruleKey ] ; ok {
impactedSince = val
} else {
impactedSince = types . Timestamp ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) )
}
values [ 6 * index ] = orgID
values [ 6 * index + 1 ] = clusterName
values [ 6 * index + 2 ] = rule . Module
values [ 6 * index + 3 ] = rule . ErrorKey
values [ 6 * index + 4 ] = string ( rule . TemplateData )
values [ 6 * index + 5 ] = impactedSince
}
return values
}
func ( storage OCPRecommendationsDBStorage ) updateReport (
tx * sql . Tx ,
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ClusterReport ,
rules [ ] types . ReportItem ,
lastCheckedTime time . Time ,
gatheredAt time . Time ,
reportedAtTime time . Time ,
) error {
|
Get the UPSERT query for writing a report into the database.
|
reportUpsertQuery := storage . getReportUpsertQuery ( )
|
Get created_at if present before deletion
|
RuleKeyCreatedAt , err := storage . getRuleKeyCreatedAtMapForTable (
"rule_hit" , orgID , clusterName ,
)
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to get recommendation impacted_since" )
RuleKeyCreatedAt = make ( map [ string ] types . Timestamp )
}
deleteQuery := "DELETE FROM rule_hit WHERE org_id = $1 AND cluster_id = $2;"
_ , err = tx . Exec ( deleteQuery , orgID , clusterName )
if err != nil {
log . Err ( err ) .
Str ( clusterKey , string ( clusterName ) ) . Interface ( orgIDStr , orgID ) .
Msg ( "Unable to remove previous cluster reports" )
return err
}
|
Perform the report insert.
All older rule hits has been deleted for given cluster so it is
possible to just insert new hits w/o the need to update on conflict
|
if len ( rules ) > 0 {
|
Get the INSERT statement for writing a rule into the database.
|
ruleInsertStatement := storage . GetRuleHitInsertStatement ( rules )
|
Get values to be stored in rule_hits table
|
values := valuesForRuleHitsInsert ( orgID , clusterName , rules , RuleKeyCreatedAt )
_ , err = tx . Exec ( ruleInsertStatement , values ... )
if err != nil {
log . Err ( err ) .
Str ( clusterKey , string ( clusterName ) ) . Interface ( orgIDStr , orgID ) .
Msg ( "Unable to insert the cluster report rules" )
return err
}
}
if gatheredAt . IsZero ( ) {
_ , err = tx . Exec ( reportUpsertQuery , orgID , clusterName , report , reportedAtTime , lastCheckedTime , 0 , sql . NullTime { Valid : false } )
} else {
_ , err = tx . Exec ( reportUpsertQuery , orgID , clusterName , report , reportedAtTime , lastCheckedTime , 0 , gatheredAt )
}
if err != nil {
log . Err ( err ) .
Str ( clusterKey , string ( clusterName ) ) . Interface ( orgIDStr , orgID ) .
Msg ( "Unable to upsert the cluster report" )
return err
}
return nil
}
func prepareInsertRecommendationsStatement (
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ReportRules ,
createdAt types . Timestamp ,
impactedSinceMap map [ string ] types . Timestamp ,
) ( selectors [ ] string , statement string , statementArgs [ ] interface { } ) {
statement = `INSERT INTO recommendation (org_id, cluster_id, rule_fqdn, error_key, rule_id, created_at, impacted_since) VALUES %s`
valuesIdx := make ( [ ] string , len ( report . HitRules ) )
statementIdx := 0
selectors = make ( [ ] string , len ( report . HitRules ) )
for idx , rule := range report . HitRules {
ruleFqdn := strings . TrimSuffix ( string ( rule . Module ) , ReportSuffix )
ruleID := ruleFqdn + "|" + string ( rule . ErrorKey )
impactedSince , ok := impactedSinceMap [ ruleFqdn + string ( rule . ErrorKey ) ]
if ! ok {
impactedSince = createdAt
}
selectors [ idx ] = ruleID
statementArgs = append ( statementArgs , orgID , clusterName , ruleFqdn , rule . ErrorKey , ruleID , createdAt , impactedSince )
statementIdx = len ( statementArgs )
const separatorAndParam = ", $"
valuesIdx [ idx ] = "($" + fmt . Sprint ( statementIdx - 6 ) +
separatorAndParam + fmt . Sprint ( statementIdx - 5 ) +
separatorAndParam + fmt . Sprint ( statementIdx - 4 ) +
separatorAndParam + fmt . Sprint ( statementIdx - 3 ) +
separatorAndParam + fmt . Sprint ( statementIdx - 2 ) +
separatorAndParam + fmt . Sprint ( statementIdx - 1 ) +
separatorAndParam + fmt . Sprint ( statementIdx ) + ")"
}
statement = fmt . Sprintf ( statement , strings . Join ( valuesIdx , "," ) )
return
}
func ( storage OCPRecommendationsDBStorage ) insertRecommendations (
tx * sql . Tx ,
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ReportRules ,
createdAt types . Timestamp ,
impactedSince map [ string ] types . Timestamp ,
) ( inserted int , err error ) {
if len ( report . HitRules ) == 0 {
log . Info ( ) .
Int ( organizationKey , int ( orgID ) ) .
Str ( clusterKey , string ( clusterName ) ) .
Int ( issuesCountKey , 0 ) .
Msg ( "No new recommendation to insert" )
return 0 , nil
}
selectors , statement , args := prepareInsertRecommendationsStatement ( orgID , clusterName , report , createdAt , impactedSince )
if _ , err = tx . Exec ( statement , args ... ) ; err != nil {
log . Error ( ) .
Int ( organizationKey , int ( orgID ) ) .
Str ( clusterKey , string ( clusterName ) ) .
Int ( issuesCountKey , inserted ) .
Interface ( createdAtKey , createdAt ) .
Strs ( selectorsKey , selectors ) .
Err ( err ) .
Msg ( "Unable to insert the recommendations" )
return 0 , err
}
log . Info ( ) .
Int ( organizationKey , int ( orgID ) ) .
Str ( clusterKey , string ( clusterName ) ) .
Int ( issuesCountKey , inserted ) .
Interface ( createdAtKey , createdAt ) .
Strs ( selectorsKey , selectors ) .
Msg ( "Recommendations inserted successfully" )
inserted = len ( selectors )
return
}
|
getRuleKeyCreatedAtMap returns a map between
(rulefqdn, errorkey) -> createdat
for each rulehit rows matching given
orgId and clusterName
|
func ( storage OCPRecommendationsDBStorage ) getRuleKeyCreatedAtMap (
query string ,
orgID types . OrgID ,
clusterName types . ClusterName ,
) (
map [ string ] types . Timestamp ,
error ) {
impactedSinceRows , err := storage . connection . Query (
query , orgID , clusterName )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "error retrieving recommendation timestamp" )
return nil , err
}
defer closeRows ( impactedSinceRows )
RuleKeyCreatedAt := make ( map [ string ] types . Timestamp )
for impactedSinceRows . Next ( ) {
var ruleFqdn string
var errorKey string
var oldTime time . Time
err := impactedSinceRows . Scan (
& ruleFqdn ,
& errorKey ,
& oldTime ,
)
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "error scanning for rule id -> created_at map" )
continue
}
newTime := types . Timestamp ( oldTime . UTC ( ) . Format ( time . RFC3339 ) )
RuleKeyCreatedAt [ ruleFqdn + errorKey ] = newTime
}
return RuleKeyCreatedAt , err
}
|
WriteReportForCluster writes result (health status) for selected cluster for given organization
|
func ( storage OCPRecommendationsDBStorage ) WriteReportForCluster (
orgID types . OrgID ,
clusterName types . ClusterName ,
report types . ClusterReport ,
rules [ ] types . ReportItem ,
lastCheckedTime time . Time ,
gatheredAt time . Time ,
storedAtTime time . Time ,
_ types . RequestID ,
) error {
|
Skip writing the report if it isn't newer than a report
that is already in the database for the same cluster.
|
if oldLastChecked , exists := storage . clustersLastChecked [ clusterName ] ; exists && ! lastCheckedTime . After ( oldLastChecked ) {
return types . ErrOldReport
}
if storage . dbDriverType != types . DBDriverPostgres {
return fmt . Errorf ( "writing report with DB %v is not supported" , storage . dbDriverType )
}
|
Begin a new transaction.
|
tx , err := storage . connection . Begin ( )
if err != nil {
return err
}
err = func ( tx * sql . Tx ) error {
|
Check if there is a more recent report for the cluster already in the database.
|
rows , err := tx . Query (
"SELECT last_checked_at FROM report WHERE org_id = $1 AND cluster = $2 AND last_checked_at > $3;" ,
orgID , clusterName , lastCheckedTime )
err = types . ConvertDBError ( err , [ ] interface { } { orgID , clusterName } )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to look up the most recent report in the database" )
return err
}
defer closeRows ( rows )
|
If there is one, print a warning and discard the report (don't update it).
|
if rows . Next ( ) {
log . Warn ( ) . Msgf ( "Database already contains report for organization %d and cluster name %s more recent than %v" ,
orgID , clusterName , lastCheckedTime )
return nil
}
err = storage . updateReport ( tx , orgID , clusterName , report , rules , lastCheckedTime , gatheredAt , storedAtTime )
if err != nil {
return err
}
storage . clustersLastChecked [ clusterName ] = lastCheckedTime
metrics . WrittenReports . Inc ( )
return nil
} ( tx )
finishTransaction ( tx , err )
return err
}
|
WriteRecommendationsForCluster writes hitting rules in received report for selected cluster
|
func ( storage OCPRecommendationsDBStorage ) WriteRecommendationsForCluster (
orgID types . OrgID ,
clusterName types . ClusterName ,
stringReport types . ClusterReport ,
creationTime types . Timestamp ,
) ( err error ) {
var report types . ReportRules
err = json . Unmarshal ( [ ] byte ( stringReport ) , & report )
if err != nil {
return err
}
tx , err := storage . connection . Begin ( )
if err != nil {
return err
}
impactedSinceMap := make ( map [ string ] ctypes . Timestamp )
err = func ( tx * sql . Tx ) error {
var deleted int64
|
Delete current recommendations for the cluster if some report has been previously stored for this cluster
|
if _ , ok := storage . clustersLastChecked [ clusterName ] ; ok {
|
Get impacted_since if present
|
impactedSinceMap , err = storage . getRuleKeyCreatedAtMapForTable (
"recommendation" , orgID , clusterName )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to get recommendation impacted_since" )
}
|
it is needed to use org_id = $1 condition there
because it allows DB to use proper btree indexing
and not slow sequential scan
|
result , err := tx . Exec (
"DELETE FROM recommendation WHERE org_id = $1 AND cluster_id = $2;" , orgID , clusterName )
err = types . ConvertDBError ( err , [ ] interface { } { clusterName } )
if err != nil {
log . Error ( ) . Err ( err ) .
Str ( clusterKey , string ( clusterName ) ) . Interface ( orgIDStr , orgID ) .
Msg ( "Unable to delete the existing recommendations" )
return err
}
|
As the documentation says:
RowsAffected returns the number of rows affected by an
update, insert, or delete. Not every database or database
driver may support this.
So we might run in a scenario where we don't have metrics
if the driver doesn't help.
|
deleted , err = result . RowsAffected ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Unable to retrieve number of deleted rows with current driver" )
return err
}
}
inserted , err := storage . insertRecommendations ( tx , orgID , clusterName , report , creationTime , impactedSinceMap )
if err != nil {
return err
}
if deleted != 0 || inserted != 0 {
log . Info ( ) .
Int64 ( "Deleted" , deleted ) .
Int ( "Inserted" , inserted ) .
Int ( organizationKey , int ( orgID ) ) .
Str ( clusterKey , string ( clusterName ) ) .
Msg ( "Updated recommendation table" )
}
|
updateRecommendationsMetrics(string(clusterName), float64(deleted), float64(inserted))
|
return nil
} ( tx )
finishTransaction ( tx , err )
return err
}
|
finishTransaction finishes the transaction depending on err. err == nil -> commit, err != nil -> rollback
|
func finishTransaction ( tx * sql . Tx , err error ) {
if err != nil {
rollbackError := tx . Rollback ( )
if rollbackError != nil {
log . Err ( rollbackError ) . Msg ( "error when trying to rollback a transaction" )
}
} else {
commitError := tx . Commit ( )
if commitError != nil {
log . Err ( commitError ) . Msg ( "error when trying to commit a transaction" )
}
}
}
|
ReadRecommendationsForClusters reads all recommendations from recommendation table for given organization
|
func ( storage OCPRecommendationsDBStorage ) ReadRecommendationsForClusters (
clusterList [ ] string ,
orgID types . OrgID ,
) ( ctypes . RecommendationImpactedClusters , error ) {
impactedClusters := make ( ctypes . RecommendationImpactedClusters , 0 )
if len ( clusterList ) < 1 {
return impactedClusters , nil
}
|
nosec G201
|
whereClause := fmt . Sprintf ( `WHERE org_id = $1 AND cluster_id IN (%v)` , inClauseFromSlice ( clusterList ) )
|
disable "G202 (CWE-89): SQL string concatenation"
nosec G202
|
query := `
SELECT
rule_id, cluster_id
FROM
recommendation
` + whereClause
|
nosec G202
|
rows , err := storage . connection . Query ( query , orgID )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "query to get recommendations" )
return impactedClusters , err
}
for rows . Next ( ) {
var (
ruleID types . RuleID
clusterID types . ClusterName
)
err := rows . Scan (
& ruleID ,
& clusterID ,
)
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "read one recommendation" )
return impactedClusters , err
}
impactedClusters [ ruleID ] = append ( impactedClusters [ ruleID ] , clusterID )
}
return impactedClusters , nil
}
|
ReadClusterListRecommendations retrieves cluster IDs and a list of hitting rules for each one
|
func ( storage OCPRecommendationsDBStorage ) ReadClusterListRecommendations (
clusterList [ ] string ,
orgID types . OrgID ,
) ( ctypes . ClusterRecommendationMap , error ) {
clusterMap := make ( ctypes . ClusterRecommendationMap , 0 )
if len ( clusterList ) < 1 {
return clusterMap , nil
}
|
we have to select from report table primarily because we need to show lastcheckedat even if there
are no rule hits (which means there are no rows in recommendation table for that cluster)
|
|
disable "G202 (CWE-89): SQL string concatenation"
nosec G202
|
query := `
SELECT
rep.cluster, rep.last_checked_at, COALESCE(rec.rule_id, '')
FROM
report rep
LEFT JOIN
recommendation rec
ON
rep.org_id = rec.org_id AND
rep.cluster = rec.cluster_id
WHERE
rep.org_id = $1 AND rep.cluster IN (%v)
`
|
nosec G201
|
query = fmt . Sprintf ( query , inClauseFromSlice ( clusterList ) )
rows , err := storage . connection . Query ( query , orgID )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "query to get recommendations" )
return clusterMap , err
}
for rows . Next ( ) {
var (
clusterID ctypes . ClusterName
ruleID ctypes . RuleID
timestamp time . Time
)
err := rows . Scan (
& clusterID ,
& timestamp ,
& ruleID ,
)
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "problem reading one recommendation" )
return clusterMap , err
}
if cluster , exists := clusterMap [ clusterID ] ; exists {
cluster . Recommendations = append ( cluster . Recommendations , ruleID )
clusterMap [ clusterID ] = cluster
} else {
|
create entry in map for new cluster ID
|
clusterMap [ clusterID ] = ctypes . ClusterRecommendationList {
|
created at is the same for all rows for each cluster
|
CreatedAt : timestamp ,
Recommendations : [ ] ctypes . RuleID { ruleID } ,
}
}
}
storage . fillInMetadata ( orgID , clusterMap )
return clusterMap , nil
}
|
ReportsCount reads number of all records stored in database
|
func ( storage OCPRecommendationsDBStorage ) ReportsCount ( ) ( int , error ) {
count := - 1
err := storage . connection . QueryRow ( "SELECT count(*) FROM report;" ) . Scan ( & count )
err = types . ConvertDBError ( err , nil )
return count , err
}
|
DeleteReportsForOrg deletes all reports related to the specified organization from the storage.
|
func ( storage OCPRecommendationsDBStorage ) DeleteReportsForOrg ( orgID types . OrgID ) error {
_ , err := storage . connection . Exec ( "DELETE FROM report WHERE org_id = $1;" , orgID )
return err
}
|
DeleteReportsForCluster deletes all reports related to the specified cluster from the storage.
|
func ( storage OCPRecommendationsDBStorage ) DeleteReportsForCluster ( clusterName types . ClusterName ) error {
_ , err := storage . connection . Exec ( "DELETE FROM report WHERE cluster = $1;" , clusterName )
return err
}
|
GetConnection returns db connection(useful for testing)
|
func ( storage OCPRecommendationsDBStorage ) GetConnection ( ) * sql . DB {
return storage . connection
}
|
WriteConsumerError writes a report about a consumer error into the storage.
|
func ( storage OCPRecommendationsDBStorage ) WriteConsumerError ( msg * sarama . ConsumerMessage , consumerErr error ) error {
_ , err := storage . connection . Exec ( `
INSERT INTO consumer_error (topic, partition, topic_offset, key, produced_at, consumed_at, message, error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` ,
msg . Topic , msg . Partition , msg . Offset , msg . Key , msg . Timestamp , time . Now ( ) . UTC ( ) , msg . Value , consumerErr . Error ( ) )
return err
}
|
GetDBDriverType returns db driver type
|
func ( storage OCPRecommendationsDBStorage ) GetDBDriverType ( ) types . DBDriver {
return storage . dbDriverType
}
|
DoesClusterExist checks if cluster with this id exists
|
func ( storage OCPRecommendationsDBStorage ) DoesClusterExist ( clusterID types . ClusterName ) ( bool , error ) {
err := storage . connection . QueryRow (
"SELECT cluster FROM report WHERE cluster = $1" , clusterID ,
) . Scan ( & clusterID )
if err == sql . ErrNoRows {
return false , nil
} else if err != nil {
return false , err
}
return true , nil
}
|
ListOfDisabledClusters function returns list of all clusters disabled for a rule from a
specified account.
|
func ( storage OCPRecommendationsDBStorage ) ListOfDisabledClusters (
orgID types . OrgID ,
ruleID types . RuleID ,
errorKey types . ErrorKey ,
) (
disabledClusters [ ] ctypes . DisabledClusterInfo ,
err error ,
) {
|
select disabled rules from toggle table and the latest feedback from disable_feedback table
LEFT join and COALESCE are used for the feedback, because feedback is filled by different
request than toggle, so it might be empty/null
|
query := `
SELECT
toggle.cluster_id,
toggle.disabled_at,
COALESCE(feedback.message, '')
FROM
cluster_rule_toggle toggle
LEFT JOIN
cluster_user_rule_disable_feedback feedback
ON feedback.updated_at = (
SELECT updated_at
FROM cluster_user_rule_disable_feedback
WHERE cluster_id = toggle.cluster_id
AND org_id = $1
AND rule_id = $2
AND error_key = $3
ORDER BY updated_at DESC
LIMIT 1
)
WHERE
toggle.org_id = $1
AND toggle.rule_id = $2
AND toggle.error_key = $3
AND toggle.disabled = $4
ORDER BY
toggle.disabled_at DESC
`
|
run the query against database
|
rows , err := storage . connection . Query ( query , orgID , ruleID , errorKey , RuleToggleDisable )
|
return empty list in case of any error
|
if err != nil {
return disabledClusters , err
}
defer closeRows ( rows )
for rows . Next ( ) {
var disabledCluster ctypes . DisabledClusterInfo
err = rows . Scan (
& disabledCluster . ClusterID ,
& disabledCluster . DisabledAt ,
& disabledCluster . Justification ,
)
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "ReadListOfDisabledRules" )
|
return partially filled slice + error
|
return disabledClusters , err
}
|
append disabled cluster read from database to a slice
|
disabledClusters = append ( disabledClusters , disabledCluster )
}
return disabledClusters , nil
}
|