ocp_processing.go

Copyright 2020, 2021, 2022, 2023 Red Hat, Inc

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

package
consumer
import
(
"encoding/json"
"errors"
"fmt"
"time"
"github.com/RedHatInsights/insights-results-aggregator/storage"
"github.com/RedHatInsights/insights-results-aggregator/producer"
"github.com/RedHatInsights/insights-results-aggregator/types"
"github.com/Shopify/sarama"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

OCPRulesProcessor satisfies MessageProcessor interface

type
OCPRulesProcessor
struct
{
}

deserializeMessage tries to unmarshall the received message and read all required attributes from it

func
(
OCPRulesProcessor
)
deserializeMessage
(
messageValue
[
]
byte
)
(
incomingMessage
,
error
)
{
var
deserialized
incomingMessage
err
:=
json
.
Unmarshal
(
messageValue
,
&
deserialized
)
if
err
!=
nil
{
return
deserialized
,
err
}
if
deserialized
.
Organization
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'OrgID'"
)
}
if
deserialized
.
ClusterName
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'ClusterName'"
)
}
if
deserialized
.
Report
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'Report'"
)
}
_
,
err
=
uuid
.
Parse
(
string
(
*
deserialized
.
ClusterName
)
)
if
err
!=
nil
{
return
deserialized
,
errors
.
New
(
"cluster name is not a UUID"
)
}
return
deserialized
,
nil
}
func
(
consumer
*
KafkaConsumer
)
writeOCPReport
(
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
,
reportAsBytes
[
]
byte
,
lastCheckedTime
,
storedAtTime
time
.
Time
,
)
error
{
if
ocpStorage
,
ok
:=
consumer
.
Storage
.
(
storage
.
OCPRecommendationsStorage
)
;
ok
{
err
:=
ocpStorage
.
WriteReportForCluster
(
*
message
.
Organization
,
*
message
.
ClusterName
,
types
.
ClusterReport
(
reportAsBytes
)
,
message
.
ParsedHits
,
lastCheckedTime
,
message
.
Metadata
.
GatheredAt
,
storedAtTime
,
message
.
RequestID
,
)
if
err
==
types
.
ErrOldReport
{
logMessageInfo
(
consumer
,
msg
,
&
message
,
"Skipping because a more recent report already exists for this cluster"
)
return
nil
}
else
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error writing report to database"
,
err
)
return
err
}
logMessageDebug
(
consumer
,
msg
,
&
message
,
"Stored report"
)
return
nil
}
err
:=
errors
.
New
(
"report could not be stored"
)
logMessageError
(
consumer
,
msg
,
&
message
,
unexpectedStorageType
,
err
)
return
err
}
func
(
consumer
*
KafkaConsumer
)
writeRecommendations
(
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
,
reportAsBytes
[
]
byte
,
storedAtTime
time
.
Time
,
)
(
time
.
Time
,
error
)
{
if
ocpStorage
,
ok
:=
consumer
.
Storage
.
(
storage
.
OCPRecommendationsStorage
)
;
ok
{
err
:=
ocpStorage
.
WriteRecommendationsForCluster
(
*
message
.
Organization
,
*
message
.
ClusterName
,
types
.
ClusterReport
(
reportAsBytes
)
,
types
.
Timestamp
(
storedAtTime
.
Format
(
time
.
RFC3339
)
)
,
)
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error writing recommendations to database"
,
err
)
return
time
.
Time
{
}
,
err
}
tStored
:=
time
.
Now
(
)
logMessageDebug
(
consumer
,
msg
,
&
message
,
"Stored recommendations"
)
return
tStored
,
nil
}
logMessageError
(
consumer
,
msg
,
&
message
,
unexpectedStorageType
,
errors
.
New
(
"recommendation could not be stored"
)
)
return
time
.
Time
{
}
,
nil
}
func
(
consumer
*
KafkaConsumer
)
writeInfoReport
(
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
,
infoStoredAtTime
time
.
Time
,
)
error
{

it is expected that message.ParsedInfo contains at least one item: result from special INFO rule containing cluster version that is used just in external data pipeline

	
if
ocpStorage
,
ok
:=
consumer
.
Storage
.
(
storage
.
OCPRecommendationsStorage
)
;
ok
{
err
:=
ocpStorage
.
WriteReportInfoForCluster
(
*
message
.
Organization
,
*
message
.
ClusterName
,
message
.
ParsedInfo
,
infoStoredAtTime
,
)
if
errors
.
Is
(
err
,
types
.
ErrOldReport
)
{
logMessageInfo
(
consumer
,
msg
,
&
message
,
"Skipping because a more recent info report already exists for this cluster"
)
return
nil
}
else
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error writing info report to database"
,
err
)
return
err
}
logMessageInfo
(
consumer
,
msg
,
&
message
,
"Stored info report"
)
return
nil
}
logMessageError
(
consumer
,
msg
,
&
message
,
unexpectedStorageType
,
errors
.
New
(
"info report could not be stored"
)
)
return
nil
}

processMessage processes an incoming message

func
(
processor
OCPRulesProcessor
)
processMessage
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
)
(
types
.
RequestID
,
incomingMessage
,
error
)
{
return
commonProcessMessage
(
consumer
,
msg
,
processor
.
storeInDB
)
}
func
(
OCPRulesProcessor
)
storeInDB
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
)
(
types
.
RequestID
,
incomingMessage
,
error
)
{
tStart
:=
time
.
Now
(
)
lastCheckedTime
,
err
:=
consumer
.
retrieveLastCheckedTime
(
msg
,
&
message
)
if
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
tTimeCheck
:=
time
.
Now
(
)
logDuration
(
tStart
,
tTimeCheck
,
msg
.
Offset
,
"time_check"
)
reportAsBytes
,
err
:=
json
.
Marshal
(
*
message
.
Report
)
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error marshalling report"
,
err
)
return
message
.
RequestID
,
message
,
err
}

Timestamp when the report is about to be written into database This ensures that the same timestamp is stored in rule_hit and recommendation tables for a given report

	
storedAtTime
:=
time
.
Now
(
)
.
UTC
(
)
err
=
consumer
.
writeOCPReport
(
msg
,
message
,
reportAsBytes
,
lastCheckedTime
,
storedAtTime
)
if
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
tStored
:=
time
.
Now
(
)
logDuration
(
tTimeCheck
,
tStored
,
msg
.
Offset
,
"db_store_report"
)
tRecommendationsStored
,
err
:=
consumer
.
writeRecommendations
(
msg
,
message
,
reportAsBytes
,
storedAtTime
)
if
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
logDuration
(
tStored
,
tRecommendationsStored
,
msg
.
Offset
,
"db_store_recommendations"
)

rule hits has been stored into database - time to log all these great info

	
logClusterInfo
(
&
message
)
infoStoredAtTime
:=
time
.
Now
(
)
if
err
:=
consumer
.
writeInfoReport
(
msg
,
message
,
infoStoredAtTime
)
;
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
infoStored
:=
time
.
Now
(
)
logDuration
(
infoStoredAtTime
,
infoStored
,
msg
.
Offset
,
"db_store_info_report"
)
return
message
.
RequestID
,
message
,
nil
}

shouldProcess determines if a parsed message should be processed further

func
(
OCPRulesProcessor
)
shouldProcess
(
consumer
*
KafkaConsumer
,
consumed
*
sarama
.
ConsumerMessage
,
parsed
*
incomingMessage
)
error
{
err
:=
checkReportStructure
(
*
parsed
.
Report
)
if
err
!=
nil
{
consumer
.
logReportStructureError
(
err
,
consumed
)
return
err
}
return
nil
}
func
verifySystemAttributeIsEmpty
(
r
Report
)
bool
{
var
s
system
if
err
:=
json
.
Unmarshal
(
*
r
[
reportAttributeSystem
]
,
&
s
)
;
err
!=
nil
{
return
false
}
if
s
.
Hostname
!=
""
{
return
false
}
return
true
}

isReportWithEmptyAttributes checks if the report is empty, or if the attributes expected in the report, minus the analysis_metadata, are empty. If this function returns true, this report will not be processed further as it is PROBABLY the result of an archive that was not processed by insights-core. see https://github.com/RedHatInsights/insights-results-aggregator/issues/1834

func
isReportWithEmptyAttributes
(
r
Report
)
bool
{

Create attribute checkers for each attribute

	
for
attr
,
attrData
:=
range
r
{

we don't care about the analysis_metadata attribute

		
if
attr
==
reportAttributeMetadata
{
continue
}

special handling for the system attribute, as it comes with data when empty

		
if
attr
==
reportAttributeSystem
{
if
!
verifySystemAttributeIsEmpty
(
r
)
{
return
false
}
continue
}

Check if this attribute of the report is empty

		
checker
:=
JSONAttributeChecker
{
data
:
*
attrData
}
if
!
checker
.
IsEmpty
(
)
{
return
false
}
}
return
true
}

checkReportStructure tests if the report has correct structure

func
checkReportStructure
(
r
Report
)
error
{

the structure is not well-defined yet, so all we should do is to check if all keys are there


'skips' and 'info' keys are now optional, we should not expect them anymore: https://github.com/RedHatInsights/insights-results-aggregator/issues/1206

	
keysNotFound
:=
make
(
[
]
string
,
0
,
numberOfExpectedKeysInReport
)
keysFound
:=
0

check if the structure contains all expected keys

	
for
_
,
expectedKey
:=
range
expectedKeysInReport
{
_
,
found
:=
r
[
expectedKey
]
if
!
found
{
keysNotFound
=
append
(
keysNotFound
,
expectedKey
)
}
else
{
keysFound
++
}
}
if
keysFound
==
numberOfExpectedKeysInReport
{
return
nil
}

empty reports mean that this message should not be processed further

	
isEmpty
:=
len
(
r
)
==
0
||
isReportWithEmptyAttributes
(
r
)
if
isEmpty
{
log
.
Debug
(
)
.
Msg
(
"Empty report or report with only empty attributes. Processing of this message will be skipped."
)
return
types
.
ErrEmptyReport
}

report is not empty, and some keys have not been found -> malformed

	
if
len
(
keysNotFound
)
!=
0
{
return
fmt
.
Errorf
(
"improper report structure, missing key(s) with name '%v'"
,
keysNotFound
)
}
return
nil
}

parseReportContent verifies the content of the Report structure and parses it into the relevant parts of the incomingMessage structure

func
parseReportContent
(
message
*
incomingMessage
)
error
{
err
:=
json
.
Unmarshal
(
*
(
(
*
message
.
Report
)
[
reportAttributeReports
]
)
,
&
message
.
ParsedHits
)
if
err
!=
nil
{
return
err
}

with support for Hypershift-enabled clusters, the info attribute became optional

	
_
,
infoExists
:=
(
*
message
.
Report
)
[
reportAttributeInfo
]
if
!
infoExists
{
log
.
Debug
(
)
.
Msgf
(
"%s key does not exist in the JSON object"
,
reportAttributeInfo
)
return
nil
}
err
=
json
.
Unmarshal
(
*
(
(
*
message
.
Report
)
[
reportAttributeInfo
]
)
,
&
message
.
ParsedInfo
)
if
err
!=
nil
{
return
err
}
return
nil
}

parseMessage is the entry point for parsing the received message. It should be the first method called within ProcessMessage in order to convert the message into a struct that can be worked with

func
(
OCPRulesProcessor
)
parseMessage
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
)
(
incomingMessage
,
error
)
{
message
,
err
:=
consumer
.
MessageProcessor
.
deserializeMessage
(
msg
.
Value
)
if
err
!=
nil
{
consumer
.
logMsgForFurtherAnalysis
(
msg
)
logUnparsedMessageError
(
consumer
,
msg
,
"Error parsing message from Kafka"
,
err
)
return
message
,
err
}
consumer
.
updatePayloadTracker
(
message
.
RequestID
,
time
.
Now
(
)
,
message
.
Organization
,
message
.
Account
,
producer
.
StatusReceived
)
if
err
:=
consumer
.
MessageProcessor
.
shouldProcess
(
consumer
,
msg
,
&
message
)
;
err
!=
nil
{
return
message
,
err
}
err
=
parseReportContent
(
&
message
)
if
err
!=
nil
{
consumer
.
logReportStructureError
(
err
,
msg
)
return
message
,
err
}
return
message
,
nil
}