Streaming Kafka Ingest
This page explains how to stream data from Kafka and write to a data sink.
Refer to the Example Overview for details about this example. Details of how to setup up a Kafka broker installation with a simple Java finance producer are provided in the setup guide.
Deploying the Stream Processor
Below is an example spec.q
Stream Processor application that processes the trades
topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.
q
.qsp.run
.qsp.read.fromKafka["trades"]
.qsp.decode.json[]
.qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
.qsp.window.timer[00:00:05]
.qsp.write.toConsole[]
Docker
Kubernetes
To deploy the pipeline above to Docker, add a worker to the docker-compose.yaml
configuration.
YAML
services:
.. # Indicates excerpt from from previous `docker-compose.yaml` examples
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
networks:
- app
depends_on:
- kafka
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
The final configuration for the docker-compose.yaml
must be one of the following
depending on the number of workers configured. For simple, one-off jobs, a single Worker
may be sufficient. For health checks and parallelism or production deployments, it is
recommended that a Controller is deployed to monitor any Workers.
Full configuration:

YAML
version: "3.6"
networks:
app:
driver: bridge
services:
zookeeper:
image: "bitnami/zookeeper:latest"
networks:
- app
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
networks:
- app
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
producer:
build:
context: .
dockerfile: Dockerfile
networks:
- app
depends_on:
- kafka
environment:
- KX_KAFKA_BROKERS=kafka:9092
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
networks:
- app
depends_on:
- kafka
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092

YAML
version: "3.6"
networks:
app:
driver: bridge
services:
zookeeper:
image: "bitnami/zookeeper:latest"
networks:
- app
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
networks:
- app
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_NUM_PARTITIONS=3
producer:
build:
context: .
dockerfile: Dockerfile
networks:
- app
depends_on:
- kafka
environment:
- KX_KAFKA_BROKERS=kafka:9092
controller:
image: portal.dl.kx.com/kxi-sp-controller:1.13.0
command: ["-p", "6000"]
networks:
- app
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_MIN_WORKERS=3
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
networks:
- app
deploy:
replicas: 3
depends_on:
- kafka
- controller
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_PARENT_HOST=controller:6000
- KXI_SP_KAFKA_BROKERS=kafka:9092
To run the configuration, execute the following.
Bash
docker-compose up
To deploy the above pipeline in Kubernetes, first follow the setup for Kubernetes guide. The pipeline can be deployed using a port forwarded Coordinator service.
Note
The Stream Processor automatically scales the number of Workers to balance
the available Kafka partitions across Workers. When deploying the Kafka broker
chart, add --set numPartitions=<number>
to increase the parallelism of the
Kafka ingestion. The Stream Processor does not create more Workers than the specified maxWorkers
.
Bash
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
settings : {
minWorkers : 1,
maxWorkers : 10
},
config : { content: $spec },
env : { KXI_SP_KAFKA_BROKERS: "kafka:9092" }
}' | jq -asR .)"
Next Steps
Setup this ingest example with TLS.