Data consumer

Every time a new record is sent by Kafka to the subscribed topic, the ccx_data_pipeline.consumer.Consumer will handle and process it, storing the needed information from the record and returning the URL to the archive in the corresponding S3 bucket.

Format of the received Kafka records


Detailed information about the exact format of received Kafka records is available at

  "account": 123456, // (uint)
  "principal": 9, // (uint)
  "size": 55099, // (uint)
  "url": "", // (string)
  "b64_identity": "eyJlbnRpdGxlbWVudHMiOnsiaW5zaWdodHMiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJjb3N0X21hbmFnZW1lbnQiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJhbnNpYmxlIjp7ImlzX2VudGl0bGVkIjp0cnVlfSwib3BlbnNoaWZ0Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwic21hcnRfbWFuYWdlbWVudCI6eyJpc19lbnRpdGxlZCI6dHJ1ZX0sIm1pZ3JhdGlvbnMiOnsiaXNfZW50aXRsZWQiOnRydWV9fSwiaWRlbnRpdHkiOnsiaW50ZXJuYWwiOnsiYXV0aF90aW1lIjoxNDAwLCJvcmdfaWQiOiIxMjM4MzAzMiJ9LCJhY2NvdW50X251bWJlciI6IjYyMTIzNzciLCJhdXRoX3R5cGUiOiJiYXNpYy1hdXRoIiwidXNlciI6eyJmaXJzdF9uYW1lIjoiSW5zaWdodHMiLCJpc19hY3RpdmUiOnRydWUsImlzX2ludGVybmFsIjpmYWxzZSwibGFzdF9uYW1lIjoiUUUiLCJsb2NhbGUiOiJlbl9VUyIsImlzX29yZ19hZG1pbiI6dHJ1ZSwidXNlcm5hbWUiOiJpbnNpZ2h0cy1xZSIsImVtYWlsIjoiam5lZWRsZStxYUByZWRoYXQuY29tIn0sInR5cGUiOiJVc2VyIn19", // (string)
  "timestamp": "2020-01-23T16:15:59.478901889Z" // (string)

The attribute b64_identity contains another JSON encoded by BASE64 encoding. User and org identities are stored here:

    "identity": {
        "account_number": "6212377",
        "auth_type": "basic-auth",
        "internal": {
            "auth_time": 1400,
            "org_id": "12383032"
        "type": "User",
        "user": {
            "email": "",
            "first_name": "Insights",
            "is_active": true,
            "is_internal": false,
            "is_org_admin": true,
            "last_name": "QE",
            "locale": "en_US",
            "username": "insights-qe"


The ccx_data_pipeline.http_downloader.HTTPDownloader class is provided to get the archive from a HTTP URL, normally obtained by the class described above. This downloader simply gets the archive from the URL, but it performs some checks, like the URL matching several regular expressions to avoid getting archives from unknown servers, or checking the size of the archive.


As this service is built on top of insights-core-messaging, the processing of the downloaded archives is done by the default engine provided by the framework.

The ICM Engine class takes the downloaded archive and, using the configured rules, processes it and generates a JSON report. This report is handled and published using the configured publisher.


This service provides a publisher class that can send the generated reports to a Kafka topic. This class is ccx_data_pipeline.kafka_publisher.KafkaPublisher. The report generated by the framework will be enhanced with more context information taken from different sources, like the organization ID or the “last checked” timestamp (taken from the incoming Kafka record containing the URL to the archive).

The generated JSON has the following format:

  "OrgID": 123456, // (int) - number that we get from b64_identity field
  "ClusterName": "aaaaaaaa-bbbb-cccc-dddd-000000000000", // (string) - cluster UUID  that we read from URL
  "Report": "{...}", // (nested JSON structure) - JSON that contains results of executing rules,
  "LastChecked": "2020-01-23T16:15:59.478901889Z" // (string) - time of the archive uploading in ISO 8601 format, gotten from "timestamp" field

The fields come from:

  • OrgID: retrieved from the incoming JSON, codified inside the b64_identity value. It is extracted from identity-internal-org_id path of keys.
  • ClusterName: the cluster name is retrieved from the downloaded archive. When the download successes and the archive is extracted prior to its processing by the engine, the cluster ID is read from a file named config/id.
  • Report: is the nested JSON structure generated by the engine when the archive is processed.
  • LastChecked: this field is copied directly from the incoming Kafka record, as timestamp key.


The LastChecked attribute is a timestamp containing the zone designator Z (aka “Zulu time” or more informally “Greenwich Mean Time”)

Cluster name extraction internals

As the cluster ID is only present inside the archive and the set of rules to be executed is configurable, this pipeline cannot relay on the executed rules to get the cluster name.

For that reason, a mixed Watcher was created: ClusterIdWatcher, that inherits from both EngineWatcher and ConsumerWatcher. This double inheritance allows this watcher to receive notification from both entities of the pipeline. The relevant events for this watcher are:

  • on_recv: a consumer event used to store the ConsumerRecord object, needed to store the cluster name, when extracted
  • on_extract: an engine event triggered when the archive is extracted, but not already processed. At this point, the files on the archive are available in the internal storage, so the relevant file can be read in order to store its content as cluster name.