Copyright 2020, 2021, 2022 Red Hat, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
|
package main
|
ReadPipelineLogFiles reads all log files gathered from CCX data pipeline pods.
|
func ReadPipelineLogFiles ( filename string ) ( int , error ) {
var err error
pipelineEntries , err := readPipelineLogFile ( filename )
if err != nil {
return 0 , err
}
return len ( pipelineEntries ) , nil
}
func readPipelineLogFile ( filename string ) ( [ ] PipelineLogEntry , error ) {
entries := [ ] PipelineLogEntry { }
|
disable "G304 (CWE-22): Potential file inclusion via variable"
|
file , err := os . Open ( filename )
if err != nil {
return entries , err
}
scanner := bufio . NewScanner ( file )
for scanner . Scan ( ) {
entry := PipelineLogEntry { }
err = json . Unmarshal ( [ ] byte ( scanner . Text ( ) ) , & entry )
if err != nil {
log . Println ( err )
} else {
entries = append ( entries , entry )
}
}
if err := scanner . Err ( ) ; err != nil {
return entries , err
}
|
disable "G304 (CWE-22): Potential file inclusion via variable"
|
file , err := os . Open ( filename )
if err != nil {
return entries , err
}
scanner := bufio . NewScanner ( file )
for scanner . Scan ( ) {
entry := AggregatorLogEntry { }
text := scanner . Text ( )
err = json . Unmarshal ( [ ] byte ( text ) , & entry )
if err != nil {
log . Println ( err )
log . Println ( text )
} else {
entries = append ( entries , entry )
}
}
if err := scanner . Err ( ) ; err != nil {
return entries , err
}
|
in case of error all we can do is to just log the error
|
if err != nil {
log . Println ( err )
}
return entries , nil
}
func filterConsumedMessages ( entries [ ] AggregatorLogEntry ) [ ] AggregatorLogEntry {
consumed := [ ] AggregatorLogEntry { }
for i := range entries {
if entries [ i ] . Message == "Consumed" && entries [ i ] . Group != "" {
consumed = append ( consumed , entries [ i ] )
}
}
return consumed
}
func filterByMessage ( entries [ ] AggregatorLogEntry , message string ) [ ] AggregatorLogEntry {
filtered := [ ] AggregatorLogEntry { }
for i := range entries {
if entries [ i ] . Message == message && entries [ i ] . Topic != "" &&
entries [ i ] . Organization != 0 && entries [ i ] . Cluster != "" &&
entries [ i ] . Group == "" {
filtered = append ( filtered , entries [ i ] )
}
}
return filtered
}
func printStatisticLine ( what string , entries [ ] AggregatorLogEntry ) {
fmt . Printf ( "%-12s %d messages\n" , what , len ( entries ) )
}
func printAggregatorStatistic ( entries [ ] AggregatorLogEntry ) {
consumed := filterConsumedMessages ( entries )
read := filterByMessage ( entries , readFilter )
whitelisted := filterByMessage ( entries , "Organization whitelisted" )
marshalled := filterByMessage ( entries , marshalledFilter )
checked := filterByMessage ( entries , "Time ok" )
stored := filterByMessage ( entries , storedFilter )
printStatisticLine ( "Consumed" , consumed )
printStatisticLine ( readFilter , read )
printStatisticLine ( "Whitelisted" , whitelisted )
printStatisticLine ( "Marshalled messages" , marshalled )
printStatisticLine ( "Checked" , checked )
printStatisticLine ( storedFilter , stored )
}
func printConsumedEntry ( entry * AggregatorLogEntry ) {
fmt . Printf ( "%s %s %s %d\n" , entry . Time , entry . Group , entry . Topic , entry . Offset )
}
func printReadEntry ( entry * AggregatorLogEntry ) {
fmt . Printf ( "%s %s %s %d %d %s\n" , entry . Time , entry . Group , entry . Topic , entry . Offset , entry . Organization , entry . Cluster )
}
func printErrorsForMessageWithOffset ( entries [ ] AggregatorLogEntry , offset int ) {
for i := range entries {
if entries [ i ] . Offset == offset && entries [ i ] . Level == "error" {
fmt . Printf ( "\t%s %s\n" , entries [ i ] . Time , entries [ i ] . Error )
}
}
}
func printConsumedEntries ( entries , notRead [ ] AggregatorLogEntry ) {
for i := range notRead {
printConsumedEntry ( & notRead [ i ] )
printErrorsForMessageWithOffset ( entries , notRead [ i ] . Offset )
}
}
func printReadEntries ( entries , notRead [ ] AggregatorLogEntry ) {
for i := range notRead {
printReadEntry ( & notRead [ i ] )
printErrorsForMessageWithOffset ( entries , notRead [ i ] . Offset )
}
}
func messageWithOffsetIn ( entries [ ] AggregatorLogEntry , offset int ) bool {
for i := range entries {
if entries [ i ] . Offset == offset {
return true
}
}
return false
}
func diffEntryListsByOffset ( list1 , list2 [ ] AggregatorLogEntry ) [ ] AggregatorLogEntry {
diff := [ ] AggregatorLogEntry { }
for i := range list1 {
if ! messageWithOffsetIn ( list2 , list1 [ i ] . Offset ) {
diff = append ( diff , list1 [ i ] )
}
}
return diff
}
func getConsumedNotReadMessages ( entries [ ] AggregatorLogEntry ) [ ] AggregatorLogEntry {
consumed := filterConsumedMessages ( entries )
read := filterByMessage ( entries , readFilter )
return diffEntryListsByOffset ( consumed , read )
}
func getNotWhitelistedMessages ( entries [ ] AggregatorLogEntry ) [ ] AggregatorLogEntry {
read := filterByMessage ( entries , readFilter )
whitelisted := filterByMessage ( entries , "Organization whitelisted" )
return diffEntryListsByOffset ( read , whitelisted )
}
func printConsumedNotRead ( entries [ ] AggregatorLogEntry ) {
notRead := getConsumedNotReadMessages ( entries )
printConsumedEntries ( entries , notRead )
}
func printAggregatorNotWhitelisted ( entries [ ] AggregatorLogEntry ) {
notWhitelisted := getNotWhitelistedMessages ( entries )
printReadEntries ( entries , notWhitelisted )
}
func analyse ( ) {
entries2 , err := readAggregatorLogFile ( "aggregator3.log" )
if err != nil {
log . Fatal ( err )
}
fmt . Println ( "Read:" , len ( entries2 ) , "log entries read" )
printAggregatorStatistic ( entries2 )
printConsumedNotRead ( entries2 )
printAggregatorNotWhitelisted ( entries2 )
_ , err = ReadPipelineLogFiles ( "pipeline.log" )
if err != nil {
log . Fatal ( err )
}
}
func main ( ) {
analyse ( )
}
|