Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

#!/usr/bin/env python3 

# vim: set fileencoding=utf-8 

 

# Copyright © 2020, 2021 Pavel Tisnovsky 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

"""Validator for messages stored in ccx.ocp.results topic.""" 

 

import sys 

 

from voluptuous import Schema 

from voluptuous import Required, Optional 

from voluptuous import Any 

from voluptuous import ALLOW_EXTRA 

 

from validators import * 

 

from common import read_control_code, cli_arguments, load_json_from_file 

from common import validate_single_message, validate_multiple_messages 

from common import print_report 

 

# Schema for report "details" node 

reportDetailsSchema = Schema( 

{ 

Optional("current"): str, 

Optional("desired"): str, 

Optional("nodes"): [dict], 

Optional("nodes_with_different_version"): [dict], 

Optional("link"): str, 

Optional("info"): dict, 

Optional("kcs"): str, 

Optional("kcs_link"): str, 

Optional("op"): dict, 

Required("error_key"): keyValueValidator, 

Required("type"): "rule", 

}) 

 

# Schema for info "details" node 

infoDetailsSchema = Schema( 

{ 

Optional("current"): str, 

Optional("desired"): str, 

Optional("cluster_id"): uuidValidator, 

Optional("grafana_link"): str, 

Required("info_key"): str, 

Required("type"): "info", 

Optional("update_time"): str, 

}) 

 

# Schema for "reports" node 

reportsSchema = Schema( 

{ 

Required("component"): ruleFQDNValidator, 

Required("details"): reportDetailsSchema, 

Required("key"): keyValueValidator, 

Required("links"): dict, 

Required("rule_id"): ruleIDValidator, 

Required("tags"): [str], 

Required("type"): "rule", 

}) 

 

# Schema for "fingerprints" node 

fingerprintsSchema = Schema( 

{ 

}) 

 

# Schema for content of "skips" node from the Report 

skipsSchema = Schema( 

{ 

Required("rule_fqdn"): ruleFQDNValidator, 

Required("reason"): keyValueValidator, 

Required("details"): str, 

Required("type"): "skip", 

}) 

 

# Schema for "info" node 

infoSchema = Schema( 

{ 

Required("component"): ruleFQDNValidator, 

Required("details"): infoDetailsSchema, 

Required("info_id"): str, 

Required("key"): keyValueValidator, 

Required("links"): dict, 

Required("tags"): [str], 

Required("type"): "info", 

}) 

 

# Schema for "pass" node 

passSchema = Schema( 

{ 

}) 

 

# Schema for messages consumed from ccx.ocp.results Kafka topic 

schema = Schema( 

{ 

Required("OrgID"): posIntValidator, 

Required("ClusterName"): uuidValidator, 

Required("LastChecked"): timestampValidatorMs, 

Required("Report"): Schema( 

{ 

Required("system"): Schema( 

{ 

Required("metadata"): dict, 

Required("hostname"): Any(None, str), 

}, extra=ALLOW_EXTRA), 

Required("reports"): [reportsSchema], 

Required("fingerprints"): [fingerprintsSchema], 

Required("skips"): [skipsSchema], 

Required("info"): [infoSchema], 

Optional("pass"): [passSchema], 

}) 

}) 

 

 

def main(): 

"""Entry point to this script.""" 

# Parse all CLI arguments. 

args = cli_arguments() 

verbose = args.verbose 

multiple = args.multiple 

input_file = args.input 

 

if multiple: 

# process multiple messages stored in one input file 

report = validate_multiple_messages(schema, input_file, verbose) 

else: 

# process single message stored in one input file 

report = validate_single_message(schema, input_file, verbose) 

 

# print report from schema validation 

print_report(report, args.nocolors) 

 

 

if __name__ == "__main__": 

main()