This script can be used to perform several operations with external data pipeline usually deployed on Stage environment and accessible through proxy server.
First operation retrieves list of clusters from the external data pipeline
through the standard REST API (and optionally via proxy server). Organization
ID needs to be provided via CLI option, because list of clusters is filtered by
organization. This operation is selected by using -l
command line option.
Second operation retrieves results from the external data pipeline for several
clusters. List of clusters needs to be stored in a plain text file. Name of
this text file is to be provided by -i
command line option. This operation is
selected by using -r
command line option.
Third operation compares two sets of results. Each set needs to be stored in
separate directory. CSV file with detailed comparison of such two sets is
generated during this operation. This operation is selected by using -c
command line option.
Fourth operation retrieves processing timestamp for both set of results and stores these timestamps into CSV files for further analysis.
REST API on Stage environment is accessed through proxy. Proxy name should be provided via CLI together with user name and password used for basic auth. [-h] [-a ADDRESS] [-x PROXY] [-u USER] [-p PASSWORD]
[-o ORGANIZATION] [-l] [-r] [-i INPUT] [-c]
[-e EXPORT_FILE_NAME] [-d] [-v]
optional arguments:
-h, --help show this help message and exit
-l, --cluster-list Operation to retrieve list of clusters via REST API
-r, --retrieve-results
Retrieve results for given list of clusters via REST
-t, --export-times Export processing times to CSV files that can be used
for further analysis
-c, --compare-results Compare two sets of results, each set stored in its
own directory
-a ADDRESS, --address ADDRESS
Address of REST API for external data pipeline
-x PROXY, --proxy PROXY
Proxy to be used to access REST API
-u USER, --user USER User name for basic authentication
-p PASSWORD, --password PASSWORD
Password for basic authentication
Organization ID
-i INPUT, --input INPUT
Specification of input file (with list of clusters,
for example)
-d1 DIRECTORY1, --directory1 DIRECTORY1
First directory containing set of results
-d2 DIRECTORY2, --directory2 DIRECTORY2
Second directory containing set of results
-e EXPORT, --export EXPORT
Name of CSV file with exported comparison results
-d, --additional-info
Add additional info about data pipeline components
into CSV report
-v, --verbose Make messages verbose
please note that at at least one operation needs to be specified:
-l, --cluster-list
-r, --retrieve-results
-c, --compare-results -l -a https://$REST_API_URL -x http://$PROXY_URL -u $USER_NAME -p $PASSWORD -o 12345678
clusters.txt -r -a https://$REST_API_URL -x http://$PROXY_URL -u $USER_NAME -p $PASSWORD -i clusters.txt -t -d1=c1 -d2=c2
and c
, results w/o info about the -c -d1=c1 -d2=c2
and c
, results with info about the -c -v -d1=c1 -d2=c2 -a https://$REST_API_URL -x http://$PROXY_URL -u $USER_NAME -p $PASSWORD
import requests
import json
import sys
import os
import csv
from collections import Counter
from collections import namedtuple
from datetime import datetime
from argparse import ArgumentParser
Data type to represent valid rule selector
ruleSelector = namedtuple("rule_selector", ["rule_id", "error_key"])
Retrieve all CLI arguments provided by user.
def cli_arguments():
First of all, we need to specify all command line flags that are recognized by this tool.
parser = ArgumentParser()
All supported command line arguments and flags
parser.add_argument("-a", "--address", dest="address", required=False,
help="Address of REST API for external data pipeline")
parser.add_argument("-x", "--proxy", dest="proxy", required=False,
help="Proxy to be used to access REST API")
parser.add_argument("-u", "--user", dest="user", required=False,
help="User name for basic authentication")
parser.add_argument("-p", "--password", dest="password", required=False,
help="Password for basic authentication")
parser.add_argument("-o", "--organization", dest="organization", required=False,
help="Organization ID")
parser.add_argument("-l", "--cluster-list", dest="cluster_list", action="store_true",
help="Operation to retrieve list of clusters via REST API",
parser.add_argument("-r", "--retrieve-results", dest="retrieve_results", action="store_true",
help="Retrieve results for given list of clusters via REST API",
parser.add_argument("-t", "--export-times", dest="export_times", action="store_true",
help="Export processing times to CSV files that can be used for further " +
parser.add_argument("-i", "--input", dest="input", default=None, required=False,
help="Specification of input file (with list of clusters, for example)")
parser.add_argument("-c", "--compare-results", dest="compare_results", action="store_true",
default=None, required=False,
help="Compare two sets of results, each set stored in its own directory")
parser.add_argument("-d1", "--directory1", dest="directory1", required=False, default=None,
help="First directory containing set of results")
parser.add_argument("-d2", "--directory2", dest="directory2", required=False, default=None,
help="Second directory containing set of results")
parser.add_argument("-e", "--export", dest="export_file_name", required=False,
help="Name of CSV file with exported comparison results")
parser.add_argument("-d", "--additional-info", dest="additional_info", action="store_true",
default=None, required=False,
help="Add additional info about data pipeline components into CSV report")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", default=None,
help="Make messages verbose", required=False)
Now it is time to parse flags, check the actual content of command line
and fill-in the object named args
return parser.parse_args()
Entry point to this script.
def main():
Parse and process and command line arguments.
args = cli_arguments()
Verbosity flag
verbose = args.verbose
setup proxy or proxies
proxies = {
'https': args.proxy
if verbose:
print("Proxy settings:", proxies)
tuple with items needed to be filled for basic authentication
auth = (args.user, args.password)
if verbose:
print("Auth settings:", auth)
check if at least required argument is provided on CLI
if not any((args.cluster_list, args.retrieve_results,
args.export_times, args.compare_results)):
print("No action requested, add -l, -r, -t, or -c")
if args.cluster_list:
retrieve_cluster_list(args.organization, args.address, proxies, auth, verbose)
if args.retrieve_results:
retrieve_results(args.address, proxies, auth, args.input, verbose)
if args.export_times:
export_times(args.directory1, args.directory2)
if args.compare_results:
assert args.directory1 is not None, \
"-d1/--directory1 CLI option needs to be provided in order to compare results"
assert args.directory2 is not None, \
"-d2/--directory2 CLI option needs to be provided in order to compare results"
retrieve and use additional info about pipeline if user depands to
info = None
if args.additional_info:
info = retrieve_additional_info(args.address, proxies, auth, verbose)
compare_results(args.directory1, args.directory2, args.export_file_name, info, verbose)
Call REST API, retrieve payload, and unmarshal it from JSON.
def call_rest_api(url, proxies, auth):
send request to REST API
response = requests.get(url, proxies=proxies, auth=auth)
elementary check for response content
assert response is not None, "Proper response expected"
assert response.status_code ==, \
f"Unexpected HTTP code returned: {response.status_code}"
response should be in JSON format, time to parse it
payload = response.json()
assert payload is not None, "JSON response expected"
return payload
Retrieve list of clusters from the external data pipeline REST API endpoint.
def retrieve_cluster_list(organization, address, proxies, auth, verbose):
construct URL to get list of clusters for given organization ID
url = f'{address}/v1/organizations/{organization}/clusters'
if verbose:
print("URL to access:", url)
payload = call_rest_api(url, proxies, auth)
check the payload content
assert "status" in payload, "'status' field needs to be present in the payload"
assert "clusters" in payload, "'clusters' field needs to be present in the payload"
print list of clusters to standard output
clusters = sorted(payload["clusters"])
for cluster in clusters:
Retrieve results from the external data pipeline REST API endpoint.
def retrieve_results(address, proxies, auth, input_file, verbose):
errors = {}
input file containing list of clusters
with open(input_file, "r") as input_file:
iterate over all cluster names
for line in input_file:
cluster = line.strip()
if verbose:
print("Cluster: ", cluster)
construct URL to get report for one specified cluster
url = f'{address}/v1/clusters/{cluster}/report'
if verbose:
print("URL to access:", url)
try to retrieve results for given cluster
retrieve_results_for_cluster(url, proxies, auth, cluster, verbose)
except Exception as e:
store error to be used later
errors[cluster] = e
Retrieve additional info about the external data pipeline via REST API endpoint.
def retrieve_additional_info(address, proxies, auth, verbose):
construct URL to get info from pipeline
url = f'{address}/v1/info'
if verbose:
print("URL to access:", url)
try to access /info endpoint, if it fails, it fails, that’s ok -> the pipeline is doomed anyway in this case
payload = call_rest_api(url, proxies, auth)
return payload["info"]
Display all errors or exceptions thrown during the selected operation.
def display_errors(errors):
if len(errors) > 0:
print("Errors detected during results processing")
for cluster, e in errors.items():
print(cluster, repr(e))
print("No errors found")
Retrieve results for one specified cluster and store it into the file.
def retrieve_results_for_cluster(url, proxies, auth, cluster, verbose):
payload = call_rest_api(url, proxies, auth)
check the payload content
assert "status" in payload, "'status' field needs to be present in the payload"
pretty print the output
results = json.dumps(payload, indent=4)
filename = "{}.json".format(cluster)
generate output file with cluster results
with open(filename, "w") as json_file:
Export processing times into CSV files to be later analyzed.
def export_times(directory1, directory2):
files1 = read_list_of_clusters_from_directory(directory1)
files2 = read_list_of_clusters_from_directory(directory2)
export_times_into("times1.csv", directory1, files1)
export_times_into("times2.csv", directory2, files2)
Export processing times into CSV file with specified name to be later analyzed.
def export_times_into(filename, directory, clusters):
create a new CSV file that will cointains processing times
with open(filename, "w") as csvfile:
create a CSV writer object
csv_writer = csv.writer(csvfile, quotechar='"', quoting=csv.QUOTE_ALL)
assert csv_writer is not None, "CSV writer can not be constructed"
csv_writer.writerow(("n", "cluster", "last checked", "analyzed", "stored"))
iterate over all clusters
for i, cluster in enumerate(clusters):
try to read both results for given cluster
r = read_cluster_results(directory, cluster)
meta = r["report"]["meta"]
t1 = meta["last_checked_at"]
t2 = meta["analyzed_at"]
t3 = meta["stored_at"]
store fields as is, to be checked, validated, and analyzed later
csv_writer.writerow((i, cluster, t1, t2, t3))
except Exception as e:
skip errors
Compare results stored in two different directories.
def compare_results(directory1, directory2, filename, info, verbose):
files1 = set(read_list_of_clusters_from_directory(directory1))
files2 = set(read_list_of_clusters_from_directory(directory2))
common cluster names in both directories
common = files1 & files2
reduntant results
redundant_d1 = sorted(list(files1 - common))
redundant_d2 = sorted(list(files2 - common))
compute difference in results
comparison_results, recommendations = compare_results_sets(directory1, directory2, common,
check we did it right
assert comparison_results is not None
assert recommendations is not None
create a new CSV file with detailed report of differences between two sets of results
with open(filename, "w") as csvfile:
create a CSV writer object
csv_writer = csv.writer(csvfile, quotechar='"', quoting=csv.QUOTE_ALL)
assert csv_writer is not None, "CSV writer can not be constructed"
export all required information into CSV file
export_additional_info(csv_writer, info)
export_basic_info(csv_writer, directory1, directory2, files1, files2, common)
export_redundant_clusters(csv_writer, redundant_d1, "Redundand clusters in 1st directory")
export_redundant_clusters(csv_writer, redundant_d2, "Redundand clusters in 2nd directory")
export_comparison_results(csv_writer, comparison_results)
if verbose:
export_recommendations(csv_writer, recommendations)
Compare two results sets.
def compare_results_sets(directory1, directory2, common, include_recommendations_table):
diff_results = []
There are two set of counters, first set is created for recommendations
read from first set of results, second set is created for recommendations
read from the second set of results. Counter keys are constructed from
and error_key
recommendations = {
"r1": Counter(),
"r2": Counter()
iterate over all clusters
for cluster in sorted(common):
diff = {}
diff["cluster"] = cluster
preliminary - can be changed later in exception handler
diff["status"] = "ok"
diff["error"] = ""
not true yet!
diff["same_results"] = "yes" # not true yet
try to read both results to be compared
r1 = read_cluster_results(directory1, cluster)
r2 = read_cluster_results(directory2, cluster)
update recommendations table if required
if include_recommendations_table:
update_recommendations(recommendations, r1, r2)
1st step is simple: rule hits counters comparison as exposed in metadata field in JSON
d1 = compare_rule_hits_count(r1, r2, diff)
rule hit numbers are the same, let’s continue with 2nd step
if d1:
TODO: better comparison
d2 = compare_rule_hits(r1, r2, diff)
if d2:
diff["same_results"] = "no"
now we know for sure, that rule hit counters are different
diff["same_results"] = "no"
diff["same_hits"] = "?"
except Exception as e:
fill-in info about error that occured during results reading or during comparison
diff["status"] = "error"
diff["error"] = repr(e)
update diff results
return diff_results, recommendations
Update counters with recommendations found in result set 1 and result set 2.
def update_recommendations(recommendations, results1, results2):
It is needed to update both set of counters, each is based on different set of recommendations reports.
update_recommendations_for_results(recommendations["r1"], results1)
update_recommendations_for_results(recommendations["r2"], results2)
Update counters with recommendations for selected result set.
def update_recommendations_for_results(counters, results):
data = results["report"]["data"]
iterate over all rule hits, check the content + update counters for each rule selector found
for hit in data:
preliminary check if all attributes are there
assert "rule_id" in hit, "Expected 'rule_id' attribute"
assert "extra_data" in hit, "Expected 'extra_data' containing a map"
assert "error_key" in hit["extra_data"], "Expected 'extra_data' containing a map"
construct the full rule selector
rule_id = hit["rule_id"]
error_key = hit["extra_data"]["error_key"]
rule_selector = ruleSelector(rule_id=rule_id, error_key=error_key)
NOTE: the dirty trick how to use named tuple as key named tuple itself is iterable and counters.update will iterate over all items, which we don’t want to. So we need to provide an one-item iterable instead
Just compare rule hits metadata and fill-in diff structure accordingly.
def compare_rule_hits_count(r1, r2, diff):
hits1 = r1["report"]["meta"]["count"]
hits2 = r2["report"]["meta"]["count"]
remember counters -> needs to be written into the table
diff["hits1"] = hits1
diff["hits2"] = hits2
the_same = hits1 == hits2
result of comparison of two counters
diff["eq_hits"] = "yes" if the_same else "no"
return comparison result
return the_same
Compare ‘read’ rule hits and fill-in diff structure accordingly.
def compare_rule_hits(r1, r2, diff):
d1 = r1["report"]["data"]
d2 = r2["report"]["data"]
all_found = True
for hit1 in d1:
found = False
for hit2 in d2:
if hit1["rule_id"] == hit2["rule_id"]:
found = True
if not found:
all_found = False
result of comparison
diff["same_hits"] = "yes" if all_found else "no"
return comparison result
return all_found
Try to read results for given cluster, where results are stored in specified directory.
def read_cluster_results(directory, cluster):
filename = f"{directory}/{cluster}.json"
with open(filename, "r") as fin:
raw_data =
results = json.loads(raw_data)
return results
Read list of clusters (taken from file names) from given directory.
def read_list_of_clusters_from_directory(directory):
list of all files in directory
files = os.listdir(directory)
filter just JSON files and get rid of file extension
return [f[:-5] for f in files if f.endswith(".json")]
Export recommendations taken from both results sets.
def export_recommendations(csv_writer, recommendations):
all rule selectors
rule_selectors = sorted(list(set(recommendations["r1"].keys()) |
empty row
sub-table title + row headers
csv_writer.writerow(("n", "rule_id", "error_key", "#hits in set1", "#hits in set2",
"diff?", "diff amount"))
table content
for i, rule_selector in enumerate(rule_selectors):
retrieve counter values for the given rule_selector
counter1 = recommendations["r1"][rule_selector]
counter2 = recommendations["r2"][rule_selector]
compute the difference between counters
diff = abs(counter1 - counter2)
difference as string
diff_str = "no" if diff == 0 else "yes"
write info about given rule_selector
csv_writer.writerow((i, rule_selector.rule_id, rule_selector.error_key,
counter1, counter2, diff_str, diff))
Export additional info about pipeline components.
def export_additional_info(csv_writer, info):
if info is None:
csv_writer.writerow(("External data pipeline components",))
csv_writer.writerow(("", "Smart Proxy"))
export_dictionary(csv_writer, info["SmartProxy"])
csv_writer.writerow(("", "Content Service"))
export_dictionary(csv_writer, info["ContentService"])
csv_writer.writerow(("", "Insights Results Aggregator"))
export_dictionary(csv_writer, info["Aggregator"])
empty row
Export content of given dictionary into CSV (starting at third row).
def export_dictionary(csv_writer, dictionary):
for key in sorted(dictionary.keys()):
csv_writer.writerow(("", "", key, dictionary[key]))
Export basic info into CSV file.
def export_basic_info(csv_writer, directory1, directory2, files1, files2, common):
csv_writer.writerow(("Basic info about test results",))
csv_writer.writerow(("", "Tested on","T", " ")))
csv_writer.writerow(("", "1st directory with results", directory1))
csv_writer.writerow(("", "2nd directory with results", directory2))
csv_writer.writerow(("", "Results in 1st directory", len(files1)))
csv_writer.writerow(("", "Results in 2nd directory", len(files2)))
csv_writer.writerow(("", "Common clusters to compare", len(common)))
empty row
Export list of redundant clusters into CSV.
def export_redundant_clusters(csv_writer, files, title):
csv_writer.writerow(("n", "cluster"))
write all cluster names preceded by counter
for i, cluster in enumerate(files):
csv_writer.writerow((i, cluster))
empty row
def export_comparison_results(csv_writer, comparison_results):
csv_writer.writerow(("Comparison results",))
csv_writer.writerow(("n", "cluster", "status", "same results", "eq.#hits", "hits1", "hits2",
"same hits", "error"))
write all cluster names preceded by counter
for i, r in enumerate(comparison_results):
if r["status"] == "ok":
csv_writer.writerow((i, r["cluster"], r["status"], r["same_results"],
r["eq_hits"], r["hits1"], r["hits2"], r["same_hits"],
csv_writer.writerow((i, r["cluster"], r["status"], "", "", "", "", "", r["error"]))
If this script is started from command line, run the main
function which is
entry point to the processing.
if __name__ == "__main__":