Copyright 2023 Red Hat, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
|
|
Package services contains interface implementations to other
services that are called from Smart Proxy.
|
package services
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/RedHatInsights/insights-operator-utils/redis"
utypes "github.com/RedHatInsights/insights-operator-utils/types"
"github.com/RedHatInsights/insights-results-smart-proxy/types"
redisV9 "github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
)
const (
|
RequestIDFieldName represents the name of the field in Redis hash containing request_id
|
RequestIDFieldName = "request_id"
|
ReceivedTimestampFieldName represents the name of the field in Redis hash containing received timestamp
|
ReceivedTimestampFieldName = "received_timestamp"
|
ProcessedTimestampFieldName represents the name of the field in Redis hash containing processed timestamp
|
ProcessedTimestampFieldName = "processed_timestamp"
|
RuleHitsFieldName represent the name of hte field in Redis hash containing simplified rule hits
|
RuleHitsFieldName = "rule_hits"
|
ScanBatchCount is the number of records to go through in a single SCAN operation
|
ScanBatchCount = 100
redisCmdExecutionFailedMsg = "failed to execute command against Redis server"
)
var (
|
RequestIDsScanPattern is a glob-style pattern to find all matching keys. Uses ?* instead of * to avoid
matching "organization:%v:cluster:%v:request:".
|
RequestIDsScanPattern = "organization:%v:cluster:%v:request:?*"
|
SimplifiedReportKey is a key under which the information about specific requests is stored
|
SimplifiedReportKey = "organization:%v:cluster:%v:request:%v:reports"
)
|
RedisInterface represents interface for functions executed against a Redis server
|
type RedisInterface interface {
|
HealthCheck defined in utils
|
HealthCheck ( ) error
GetRequestIDsForClusterID (
types . OrgID ,
types . ClusterName ,
) ( [ ] types . RequestID , error )
GetTimestampsForRequestIDs (
types . OrgID ,
types . ClusterName ,
[ ] types . RequestID ,
bool ,
) ( [ ] types . RequestStatus , error )
GetRuleHitsForRequest (
types . OrgID ,
types . ClusterName ,
types . RequestID ,
) ( [ ] types . RuleID , error )
}
|
RedisClient is a local type which embeds the imported redis.Client to include its own functionality
|
type RedisClient struct {
redis . Client
}
|
NewRedisClient creates a new Redis client based on configuration and returns RedisInterface
|
func NewRedisClient ( conf RedisConfiguration ) ( RedisInterface , error ) {
client , err := redis . CreateRedisClient (
conf . RedisEndpoint ,
conf . RedisDatabase ,
conf . RedisPassword ,
conf . RedisTimeoutSeconds ,
)
if err != nil {
return nil , err
}
return & RedisClient {
redis . Client { Connection : client } ,
} , nil
}
|
GetRequestIDsForClusterID retrieves a list of request IDs from Redis.
"List" of request IDs is in the form of keys with empty values in the following structure:
organization:{orgid}:cluster:{clusterid}:request:{request_id1}.
|
func ( redisClient * RedisClient ) GetRequestIDsForClusterID (
orgID types . OrgID ,
clusterID types . ClusterName ,
) ( requestIDs [ ] types . RequestID , err error ) {
ctx := context . Background ( )
scanKey := fmt . Sprintf ( RequestIDsScanPattern , orgID , clusterID )
log . Debug ( ) . Str ( "Scan key" , scanKey ) . Msg ( "Key to retrieve request IDs from Redis" )
var cursor uint64
for {
var keys [ ] string
var err error
keys , cursor , err = redisClient . Client . Connection . Scan ( ctx , cursor , scanKey , ScanBatchCount ) . Result ( )
if err != nil {
log . Error ( ) . Err ( err ) .
Str ( "scanKey" , scanKey ) . Uint64 ( "cursor" , cursor ) .
Msg ( "failed to execute SCAN command for key and cursor" )
return nil , err
}
|
get last part of key == request_id
|
for _ , key := range keys {
|
exclude simplified report keys that are ending with ":reports" suffix
|
if strings . HasSuffix ( key , ":reports" ) {
continue
}
keySliced := strings . Split ( key , ":" )
requestID := keySliced [ len ( keySliced ) - 1 ]
requestIDs = append ( requestIDs , types . RequestID ( requestID ) )
}
if cursor == 0 {
break
}
}
log . Debug ( ) . Msgf ( "retrieved %d request IDs for cluster_id %v: %v" , len ( requestIDs ) , clusterID , requestIDs )
return
}
|
GetTimestampsForRequestIDs retrieves the 'received' and 'processed' timestamps of each Request
for given list of Request IDs. It doesn't retrieve the whole Hash, but only the fields we need.
It utilizes Redis pipelines in order to avoid multiple client-server round trips.
|
func ( redisClient * RedisClient ) GetTimestampsForRequestIDs (
orgID types . OrgID ,
clusterID types . ClusterName ,
requestIDs [ ] types . RequestID ,
omitMissing bool ,
) ( requestStatuses [ ] types . RequestStatus , err error ) {
ctx := context . Background ( )
|
prepare keys to be used in HMGet commands
|
keys := make ( [ ] string , len ( requestIDs ) )
for i , requestID := range requestIDs {
keys [ i ] = fmt . Sprintf ( SimplifiedReportKey , orgID , clusterID , requestID )
}
|
queue commands in Redis pipeline. EXEC command is issued upon function exit
|
commands , err := redisClient . Client . Connection . Pipelined ( ctx , func ( pipe redisV9 . Pipeliner ) error {
for _ , key := range keys {
pipe . HMGet ( ctx , key , RequestIDFieldName , ReceivedTimestampFieldName , ProcessedTimestampFieldName )
}
return nil
} )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( redisCmdExecutionFailedMsg )
return requestStatuses , err
}
|
iterate over results issued in pipeline. Even though we know len(commands),
some keys might be missing and we might want to omit them, so we can't initialize slice safely
|
for i , cmd := range commands {
var report types . RequestStatus
err = cmd . ( * redisV9 . SliceCmd ) . Scan ( & report )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( redisCmdExecutionFailedMsg )
return [ ] types . RequestStatus { } , err
}
|
omit missing data or invalidate the request
|
if report . RequestID == "" {
if omitMissing {
continue
}
|
commands in Redis pipeline are guaranteed to be executed in the order they were issued in,
therefore we can get the missing request ID from the original slice
|
report . RequestID = string ( requestIDs [ i ] )
report . Valid = false
} else {
report . Valid = true
}
|
everything went fine, add to the response
|
requestStatuses = append ( requestStatuses , report )
}
return
}
|
GetRuleHitsForRequest is used to get the rule_hits field from Hash type
stored in Redis.
|
func ( redisClient * RedisClient ) GetRuleHitsForRequest (
orgID types . OrgID ,
clusterID types . ClusterName ,
requestID types . RequestID ,
) ( ruleHits [ ] types . RuleID , err error ) {
var simplifiedReport types . SimplifiedReport
ctx := context . Background ( )
key := fmt . Sprintf ( SimplifiedReportKey , orgID , clusterID , requestID )
cmd := redisClient . Client . Connection . HMGet ( ctx , key , RequestIDFieldName , RuleHitsFieldName )
if err = cmd . Err ( ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( redisCmdExecutionFailedMsg )
return
}
err = cmd . Scan ( & simplifiedReport )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "failed to scan result map into a struct" )
return
}
|
report not found in storage
|
if simplifiedReport . RequestID == "" {
err = & utypes . ItemNotFoundError { ItemID : requestID }
log . Warn ( ) . Err ( err ) . Msgf ( "request data for request_id %v not found in Redis" , requestID )
return
}
log . Debug ( ) . Msgf ( "rule hits CSV retrieved from Redis: %v" , simplifiedReport . RuleHitsCSV )
|
validate rule IDs coming from Redis
|
ruleHitsSplit := strings . Split ( simplifiedReport . RuleHitsCSV , "," )
for _ , ruleHit := range ruleHitsSplit {
ruleIDRegex := regexp . MustCompile ( `^([a-zA-Z_0-9.]+)[|]([a-zA-Z_0-9.]+)$` )
isRuleIDValid := ruleIDRegex . MatchString ( ruleHit )
if ! isRuleIDValid {
log . Error ( ) . Str ( "rule_id" , ruleHit ) . Msg ( "rule_id retrieved from Redis is in invalid format" )
continue
}
ruleHits = append ( ruleHits , types . RuleID ( ruleHit ) )
}
return
}
|