Streaming Kafka Ingest

This page describes how to read data from Kafka and visualize it in a kdb Insights View.

Apache Kafka is an event streaming platform that seamlessly integrates with kdb Insights Enterprise, enabling real-time data processing through pipelines connected to Kafka data sources. For this example, we have provided a Kafka subway feed that generates live alerts for NYC Subway trains, including arrival times, station coordinates, direction, and route details.

This example shows how to use the CLI to do the following:

Before you begin, you must complete the prerequisites described in the following section.

Note

Streaming subway data

Refer to the Streaming subway data to a data grid in Views walkthrough for instructions on setting up and deploying a package to ingest and view Kafka data using the web interface.

Prerequisites

  1. Configure a username and bearer token, for the KX Downloads Portal, as environment variables. To do this:

    • Click the KX Downloads Portal link and follow the steps to login.

    • Click your login name, in the top right-hand corner, and click Token Management.

    • Click Add New Token.

    • Copy the value in the Bearer field and save it for use later.

    • Use the following commands to set the environment variables, where KX_USER is the username you used to login to the KX Downloads Portal, and KX_DL_BEARER is the bearer token you just generated.

      bash

      Copy
          export KX_USER=<USERNAME>
          export KX_DL_BEARER=<TOKEN>
                          
  2. A running kdb Insights Enterprise system with the hostname URL configured as the value for the KX_HOSTNAME environment variable.

    bash

    Copy
    export KX_HOSTNAME=<URL>
                        
  3. The CLI configured for your deployment with authentication credentials

Downloading the package

  1. Run the following command to set KX_VERSION environment variable in your terminal.

    bash

    Copy
    export KX_VERSION=<VERSION>
                        
  2. Download the sample package with the command below.

    bash

    Copy
    curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/package-samples/${KX_VERSION}/kafka-ingest-${KX_VERSION}.kxi
                        
  3. Run the following command to unpack and view the contents of the kafka-ingest sample package.

    bash

    Copy
    export PKG=kafka-ingest
    kxi package unpack $PKG-$KX_VERSION.kxi
                        

    The kafka-ingest sample package contains the files listed below and described in the following table:

    bash

    Copy
    kafka-ingest/
    ├── manifest.yaml
    ├── pipelines
    │   └── subway.yaml
    ├── src
    │   ├── ml.py
    │   ├── ml.q
    │   ├── subway.py
    │   └── subway.q
    └── views
        └── subway.yaml

    3 directories, 7 files
                        

    Artefact

    Description

    manifest.yaml

    Used by the CLI for package management. Do not modify the contents of this file.

    pipelines

    The pipeline configuration files, subway.yaml.

    src

    Pipeline spec files.

    subway.q: this is default pipeline spec, written in q. This is located under kafka-ingest/src/subway.q in the unpacked package.

    subway.py: The equivalent Python pipeline spec is located under kafka-ingest/src/subway.py.

    ml.q/ml.py: ML q and Python examples are also contained within the package for reference.

    views

    The view configuration files, subway.yaml.

Click on the q or Python tab below to view the contents of the q and Python versions of the subway pipeline spec.

q

Python

kafka-ingest/src/subway.q

q

Copy
subway: ([] trip_id: `symbol$(); arrival_time: `timestamp$(); stop_id: `symbol$(); stop_sequence: `long$(); stop_name: `symbol$(); stop_lat: `float$(); stop_lon: `float$(); route_id: `long$(); trip_headsign: `symbol$(); direction_id: `symbol$(); route_short_name: `symbol$(); route_long_name: `symbol$(); route_desc: `char$(); route_type: `long$(); route_url: `symbol$(); route_color: `symbol$())

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers ; "kafka.trykdb.kx.com:443");
        (`topic   ; "subway");
        (`options; (!) . flip (
            (`security.protocol ; "SASL_SSL");
            (`sasl.username     ; "demo");
            (`sasl.password     ; "demo");
            (`sasl.mechanism    ; "SCRAM-SHA-512"))))]
    .qsp.decode.json[]
    .qsp.map[{ enlist x }]
    .qsp.transform.schema[subway]
    .qsp.write.toSubscriber[`subway;`trip_id]
                    

kafka-ingest/src/subway.py

Python

Copy
schema = {'trip_id': pykx.SymbolAtom,
        'arrival_time': pykx.TimestampAtom,
        'stop_id': pykx.SymbolAtom,
        'stop_sequence': pykx.ShortAtom,
        'stop_name': pykx.SymbolAtom,
        'stop_lat': pykx.FloatAtom,
        'stop_lon': pykx.FloatAtom,
        'route_id': pykx.ShortAtom,
        'trip_headsign': pykx.SymbolAtom,
        'direction_id': pykx.SymbolAtom,
        'route_short_name': pykx.SymbolAtom,
        'route_long_name': pykx.SymbolAtom,
        'route_desc': pykx.List,
        'route_type': pykx.ShortAtom,
        'route_url': pykx.SymbolAtom,
        'route_color': pykx.SymbolAtom}

sp.run(sp.read.from_kafka('subway',
                        brokers='kafka.trykdb.kx.com:443',
                        options={'sasl.username': 'demo',
                                'sasl.password': 'demo',
                                'sasl.mechanism': 'SCRAM-SHA-512',
                                'security.protocol': 'SASL_SSL'})
    | sp.decode.json()
    | sp.transform.schema(schema)
    | sp.write.to_subscriber('subway', 'trip_id'))
                    

The subway pipeline spec file can be summarised as follows:

Object/Node

q Object

Python Object

Description

Schema

table

pykx.Dict

The schema definition used for type conversion when parsing incoming data.

Read from Kafka

.qsp.read.fromKafka

sp.read.from_kafka

Setup Kafka reader with topic, broker and security config.

Decode JSON

.qsp.decode.json

 

sp.decode.json

Use JSON decoder to decode incoming data.

Transform Schema

.qsp.transform.schema sp.transform.schema

Parse the incoming data using the defined schema.

Write to Subscriber

.qsp.write.toSubscriber sp.write.to_subscribe

Write the data to a subscriber named subway with trip_id key column.

Deploying the package

Next, authenticate with kdb Insights Enterpriseand deploy the package to begin reading the subway data from Kafka and writing it to the subscriber/web-socket.

  1. Run the following command to authenticate with kdb Insights Enterprise:

    bash

    Copy
    kxi auth login
                        
  2. Perform the following setup for Python (skip to 3 for q):

    Python

    Unpack the sample package.

    bash

    Copy
    kxi package unpack $PKG-$KX_VERSION.kxi
                        

    Replace the values of base and spec in pipelines/subway.yaml to use the Python spec file instead of the q one.

    yaml

    Copy
    # base: q
    base: py
                        

    yaml

    Copy
    # spec: src/subway.q
    spec: src/subway.py
                        
  3. Run the following command to deploy the package:

    bash

    Copy
    kxi pm push $PKG
    kxi pm deploy $PKG
                        

    Note

    Cleaning up resources

    It may be necessary to teardown any previously installed version of the package and clean up resources before deploying a new version of the package.

Viewing output

The subway View is included in the kafka-ingest sample package. To view data for all inbound trains on the 6, 7 or 8th Avenue Express routes:

  1. Log into the kdb Insights Enterprise Web Interface

  2. Navigate to the Views tab and click on the subway View.

Checking progress

To check the progress of the running pipeline, use the kdb Insights Enterprise REST APIs.

  1. Get the token generated when you authenticated against the system.

    bash

    Copy
    export KX_TOKEN=$(kxi auth print-token)
                        

    Note

    Regenerate expired token

    If this token expires, you can regenerate it by running kxi auth login again and re-storing with the command above.

  2. Check the progress of the SP pipeline using the details API, as follows:

    bash

    Copy
    curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details
                        

    A pipeline state of RUNNING indicates that it is processing data.

    bash

    Copy
    [
    {
        "id":"kafka-ingest-subway",
        "name":"subway",
        "group":"g-794319749428",
        "mode":"default",
        "created":"2025-03-03T12:22:29.000000000",
        "lastSeen":"2025-03-03T12:27:11.389548559",
        "state":"RUNNING",
        "error":"",
        "metrics":{
            "eventRate":0,
            "bytesRate":0,
            "latency":null,
            "inputRate":1328.754,
            "outputRate":2.745616
        },
        "logCounts":{
            "trace":0,
            "debug":0,
            "info":157,
            "warn":3,
            "error":0,
            "fatal":0
        },
        "packageName":"kafka-ingest",
        "reportedMetadata":[
            {
                "id":1,
                "name":"kafka-ingest-subway-1-spwork",
                "reportedMetadata":[
                {
                    "operatorID":"subscriber_subway",
                    "plugin":"subscriber",
                    "cacheLimit":2000,
                    "keyCol":"trip_id",
                    "publishFrequency":null,
                    "table":"subway"
                }
                ],
                "pipeID":"kafka-ingest-subway"
            }
        ]
    }
    ]
                        

Teardown

You must teardown a previously installed version of the package and clean up resources before you can deploy a new version of the package.

  1. To teardown packages and their resources, run the command below.

    bash

    Copy
    kxi pm teardown --rm-data $PKG
                        

Further reading