Streaming Kafka Ingest

This page explains how to stream data from Kafka and write to a data sink.

Refer to the Example Overview for details about this example. Details of how to setup up a Kafka broker installation with a simple Java finance producer are provided in the setup guide.

Deploying the Stream Processor

Below is an example spec.q Stream Processor application that processes the trades topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.

q

Copy
.qsp.run
  .qsp.read.fromKafka["trades"]
  .qsp.decode.json[]
  .qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
  .qsp.window.timer[00:00:05]
  .qsp.write.toConsole[]

Docker

Kubernetes

To deploy the pipeline above to Docker, add a worker to the docker-compose.yaml configuration.

YAML

Copy
services:
    .. # Indicates excerpt from from previous `docker-compose.yaml` examples
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    networks:
      - app
    depends_on:
      - kafka
    volumes:
      - .:/opt/kx/app/data
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092

The final configuration for the docker-compose.yaml must be one of the following depending on the number of workers configured. For simple, one-off jobs, a single Worker may be sufficient. For health checks and parallelism or production deployments, it is recommended that a Controller is deployed to monitor any Workers.

Full configuration:

To run the configuration, execute the following.

Bash

Copy
docker-compose up

To deploy the above pipeline in Kubernetes, first follow the setup for Kubernetes guide. The pipeline can be deployed using a port forwarded Coordinator service.

Note

The Stream Processor automatically scales the number of Workers to balance the available Kafka partitions across Workers. When deploying the Kafka broker chart, add --set numPartitions=<number> to increase the parallelism of the Kafka ingestion. The Stream Processor does not create more Workers than the specified maxWorkers.

Bash

Copy
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat spec.q)" \
    '{
        name     : "kafka",
        type     : "spec",
        settings : {
            minWorkers : 1,
            maxWorkers : 10
        },
        config   : { content: $spec },
        env : { KXI_SP_KAFKA_BROKERS: "kafka:9092" }
    }' | jq -asR .)"

Next Steps

Setup this ingest example with TLS.