insights-ccx-messaging

Stub for all CCX services based on Insights Core Messaging framework

View the Project on GitHub RedHatInsights/insights-ccx-messaging

Implementation

Data consumer

Every time a new record is sent by Kafka to the subscribed topic, the ccx_messaging.consumers.kafka_consumer.KafkaConsumer will check the destination service. In case it matches with the configured one, the incoming message will be deserialized, handled and processed, storing the needed information from the record and returning the URL to the archive in the corresponding S3 bucket.

Format of the received Kafka records

Detailed information about the exact format of received Kafka records is
available at
https://redhatinsights.github.io/insights-data-schemas/platform_upload_announce_messages.html.
{
  "account": 123456, // (uint)
  "principal": 9, // (uint)
  "size": 55099, // (uint)
  "url": "https://insights-dev-upload-perm.s3.amazonaws.com/e927438c126040dab7891608447da0b5?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJW4PUHKGSOIEEI7A%2F20200123%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20200123T161559Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=3e123beac8503f4338f611f85b053f7f15e69e2748228f9f98b6986e7c06fb6c", // (string)
  "b64_identity": "eyJlbnRpdGxlbWVudHMiOnsiaW5zaWdodHMiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJjb3N0X21hbmFnZW1lbnQiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJhbnNpYmxlIjp7ImlzX2VudGl0bGVkIjp0cnVlfSwib3BlbnNoaWZ0Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwic21hcnRfbWFuYWdlbWVudCI6eyJpc19lbnRpdGxlZCI6dHJ1ZX0sIm1pZ3JhdGlvbnMiOnsiaXNfZW50aXRsZWQiOnRydWV9fSwiaWRlbnRpdHkiOnsiaW50ZXJuYWwiOnsiYXV0aF90aW1lIjoxNDAwLCJvcmdfaWQiOiIxMjM4MzAzMiJ9LCJhY2NvdW50X251bWJlciI6IjYyMTIzNzciLCJhdXRoX3R5cGUiOiJiYXNpYy1hdXRoIiwidXNlciI6eyJmaXJzdF9uYW1lIjoiSW5zaWdodHMiLCJpc19hY3RpdmUiOnRydWUsImlzX2ludGVybmFsIjpmYWxzZSwibGFzdF9uYW1lIjoiUUUiLCJsb2NhbGUiOiJlbl9VUyIsImlzX29yZ19hZG1pbiI6dHJ1ZSwidXNlcm5hbWUiOiJpbnNpZ2h0cy1xZSIsImVtYWlsIjoiam5lZWRsZStxYUByZWRoYXQuY29tIn0sInR5cGUiOiJVc2VyIn19", // (string)
  "timestamp": "2020-01-23T16:15:59.478901889Z" // (string)
}

The attribute b64_identity contains another JSON encoded by BASE64 encoding. User and org identities are stored here:

    "identity": {
        "account_number": "6212377",
        "auth_type": "basic-auth",
        "internal": {
            "auth_time": 1400,
            "org_id": "12383032"
        },
        "type": "User",
        "user": {
            "email": "jneedle+qa@redhat.com",
            "first_name": "Insights",
            "is_active": true,
            "is_internal": false,
            "is_org_admin": true,
            "last_name": "QE",
            "locale": "en_US",
            "username": "insights-qe"
        }

Download

The ccx_messaging.downloaders.http_downloader.HTTPDownloader class is provided to get the archive from a HTTP URL, normally obtained by the class described above. This downloader simply gets the archive from the URL, but it performs some checks, like the URL matching several regular expressions to avoid getting archives from unknown servers, or checking the size of the archive.

Processing

As this service is built on top of insights-core-messaging, the processing of the downloaded archives is done by the default engine provided by the framework.

The ICM Engine class takes the downloaded archive and, using the configured rules, processes it and generates a JSON report. This report is handled and published using the configured publisher.

Reporting

This service provides a publisher class that can send the generated reports to a Kafka topic. This class is ccx_messaging.publishers.rule_processing_publisher.RuleProcessingPublisher. The report generated by the framework will be enhanced with more context information taken from different sources, like the organization ID or the “last checked” timestamp (taken from the incoming Kafka record containing the URL to the archive).

The generated JSON has the following format:

{
  "OrgID": 123456, // (int) - number that we get from b64_identity field
  "ClusterName": "aaaaaaaa-bbbb-cccc-dddd-000000000000", // (string) - cluster UUID  that we read from URL
  "Report": "{...}", // (nested JSON structure) - JSON that contains results of executing rules,
  "LastChecked": "2020-01-23T16:15:59.478901889Z" // (string) - time of the archive uploading in ISO 8601 format, gotten from "timestamp" field
}

The fields come from:

The `LastChecked` attribute is a timestamp containing the zone designator Z
(aka "Zulu time" or more informally "Greenwich Mean Time")

Cluster name extraction internals

As the cluster ID is only present inside the archive and the set of rules to be executed is configurable, this pipeline cannot relay on the executed rules to get the cluster name.

For that reason, a mixed Watcher was created: ClusterIdWatcher, that inherits from both EngineWatcher and ConsumerWatcher. This double inheritance allows this watcher to receive notification from both entities of the pipeline. The relevant events for this watcher are: