dvo_processing.go

Copyright 2023 Red Hat, Inc

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

package
consumer
import
(
"encoding/json"
"errors"
"fmt"
"time"
"github.com/RedHatInsights/insights-results-aggregator/producer"
"github.com/RedHatInsights/insights-results-aggregator/storage"
"github.com/rs/zerolog/log"
"github.com/RedHatInsights/insights-results-aggregator/types"
"github.com/Shopify/sarama"
"github.com/google/uuid"
)

DVORulesProcessor satisfies MessageProcessor interface

type
DVORulesProcessor
struct
{
}
func
(
DVORulesProcessor
)
deserializeMessage
(
messageValue
[
]
byte
)
(
incomingMessage
,
error
)
{
var
deserialized
incomingMessage
received
,
err
:=
DecompressMessage
(
messageValue
)
if
err
!=
nil
{
return
deserialized
,
err
}
err
=
json
.
Unmarshal
(
received
,
&
deserialized
)
if
err
!=
nil
{
return
deserialized
,
err
}
if
deserialized
.
Organization
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'OrgID'"
)
}
if
deserialized
.
ClusterName
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'ClusterName'"
)
}
_
,
err
=
uuid
.
Parse
(
string
(
*
deserialized
.
ClusterName
)
)
if
err
!=
nil
{
return
deserialized
,
errors
.
New
(
"cluster name is not a UUID"
)
}
if
deserialized
.
DvoMetrics
==
nil
{
return
deserialized
,
errors
.
New
(
"missing required attribute 'Metrics'"
)
}
return
deserialized
,
nil
}

parseMessage is the entry point for parsing the received message. It should be the first method called within ProcessMessage in order to convert the message into a struct that can be worked with

func
(
DVORulesProcessor
)
parseMessage
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
)
(
incomingMessage
,
error
)
{
message
,
err
:=
consumer
.
MessageProcessor
.
deserializeMessage
(
msg
.
Value
)
if
err
!=
nil
{
consumer
.
logMsgForFurtherAnalysis
(
msg
)
logUnparsedMessageError
(
consumer
,
msg
,
"Error parsing message from Kafka"
,
err
)
return
message
,
err
}
consumer
.
updatePayloadTracker
(
message
.
RequestID
,
time
.
Now
(
)
,
message
.
Organization
,
message
.
Account
,
producer
.
StatusReceived
)
if
err
:=
consumer
.
MessageProcessor
.
shouldProcess
(
consumer
,
msg
,
&
message
)
;
err
!=
nil
{
return
message
,
err
}
err
=
parseDVOContent
(
&
message
)
if
err
!=
nil
{
consumer
.
logReportStructureError
(
err
,
msg
)
return
message
,
err
}
return
message
,
nil
}
func
(
processor
DVORulesProcessor
)
processMessage
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
)
(
types
.
RequestID
,
incomingMessage
,
error
)
{
return
commonProcessMessage
(
consumer
,
msg
,
processor
.
storeInDB
)
}
func
(
DVORulesProcessor
)
shouldProcess
(
_
*
KafkaConsumer
,
_
*
sarama
.
ConsumerMessage
,
parsed
*
incomingMessage
)
error
{
rawMetrics
:=
*
parsed
.
DvoMetrics
if
len
(
rawMetrics
)
==
0
{
log
.
Debug
(
)
.
Msg
(
"The 'Metrics' part of the JSON is empty. This message will be skipped"
)
return
types
.
ErrEmptyReport
}
if
_
,
found
:=
rawMetrics
[
"workload_recommendations"
]
;
!
found
{
return
fmt
.
Errorf
(
"improper report structure, missing key with name 'workload_recommendations'"
)
}
return
nil
}

parseDVOContent verifies the content of the DVO structure and parses it into the relevant parts of the incomingMessage structure

func
parseDVOContent
(
message
*
incomingMessage
)
error
{
return
json
.
Unmarshal
(
*
(
(
*
message
.
DvoMetrics
)
[
"workload_recommendations"
]
)
,
&
message
.
ParsedWorkloads
)
}
func
(
DVORulesProcessor
)
storeInDB
(
consumer
*
KafkaConsumer
,
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
)
(
types
.
RequestID
,
incomingMessage
,
error
)
{
tStart
:=
time
.
Now
(
)
lastCheckedTime
,
err
:=
consumer
.
retrieveLastCheckedTime
(
msg
,
&
message
)
if
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
tTimeCheck
:=
time
.
Now
(
)
logDuration
(
tStart
,
tTimeCheck
,
msg
.
Offset
,
"time_check"
)
reportAsBytes
,
err
:=
json
.
Marshal
(
*
message
.
DvoMetrics
)
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error marshalling report"
,
err
)
return
message
.
RequestID
,
message
,
err
}
err
=
consumer
.
writeDVOReport
(
msg
,
message
,
reportAsBytes
,
lastCheckedTime
)
if
err
!=
nil
{
return
message
.
RequestID
,
message
,
err
}
tStored
:=
time
.
Now
(
)
logDuration
(
tTimeCheck
,
tStored
,
msg
.
Offset
,
"db_store_report"
)
return
message
.
RequestID
,
message
,
nil
}
func
(
consumer
*
KafkaConsumer
)
writeDVOReport
(
msg
*
sarama
.
ConsumerMessage
,
message
incomingMessage
,
reportAsBytes
[
]
byte
,
lastCheckedTime
time
.
Time
,
)
error
{
if
dvoStorage
,
ok
:=
consumer
.
Storage
.
(
storage
.
DVORecommendationsStorage
)
;
ok
{

timestamp when the report is about to be written into database

		
storedAtTime
:=
time
.
Now
(
)
err
:=
dvoStorage
.
WriteReportForCluster
(
*
message
.
Organization
,
*
message
.
ClusterName
,
types
.
ClusterReport
(
reportAsBytes
)
,
message
.
ParsedWorkloads
,
lastCheckedTime
,
message
.
Metadata
.
GatheredAt
,
storedAtTime
,
message
.
RequestID
,
)
if
err
==
types
.
ErrOldReport
{
logMessageInfo
(
consumer
,
msg
,
&
message
,
"Skipping because a more recent report already exists for this cluster"
)
return
nil
}
else
if
err
!=
nil
{
logMessageError
(
consumer
,
msg
,
&
message
,
"Error writing report to database"
,
err
)
return
err
}
logMessageDebug
(
consumer
,
msg
,
&
message
,
"Stored report"
)
return
nil
}
err
:=
errors
.
New
(
"report could not be stored"
)
logMessageError
(
consumer
,
msg
,
&
message
,
unexpectedStorageType
,
err
)
return
err
}