Kafka Setup

This page guides you on how to setup up a Kafka broker installation with a simple Java finance producer.

Refer to the Example Overview for details about this example.

Note

It is recommended for production deployments that Kafka be configured with TLS encryption. For a guide on setting up TLS with Kafka and the KX Stream Processor, refer to the Kafka with TLS guide.

Deploying a Kafka broker

This example uses the bitnami/kafka images for deploying to both Docker and Kubernetes.

Docker

Kubernetes

To deploy a broker within a Docker configuration, add a ZooKeeper and Kafka container to a docker-compose.yaml.

YAML

Copy
version: "3.6"
networks:
  app:
    driver: bridge
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    networks:
      - app
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    networks:
      - app
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181

This deploys a single broker in a Docker Compose configuration.

Next, a producer and SP worker are added to the configuration to process a data feed.

To install a Kafka broker to Kubernetes, simply add the bitnami/kafka helm repository. This deployment launches a three node broker cluster.

Note

While the Kafka brokers are initializing, they may restart several times before they determine which node is to be the leader. Once the brokers elect a leader they stabilize.

Bash

Copy
helm repo add bitnami https://charts.bitnami.com/bitnami

helm install kafka \
   --set replicaCount=3  \
   --set livenessProbe.initialDelaySeconds=60 \
   --set readinessProbe.initialDelaySeconds=60 \
   bitnami/kafka

Deploying a producer

Market producer

A sample Java based market data generator is provided below for creating sample data for this Kafka example.

Download example code

Compiling the producer:

The example code was written using Kafka 2.7.1 and Java 1.8 or newer. To compile the JAR file, run javac with the classpath set to include the Kafka development libraries. Download the latest Kafka package from the Kafka downloads page. Make sure to use the prebuilt one and not the src package.

Bash

Copy
mkdir -p build

# This command assumes that the Kafka package was downloaded and unzipped into the
# `market-producer` directory as `kafka`
javac -cp 'kafka/libs/*:.' *.java -d build
jar cf market-generator.jar -C build .

The producer outputs two sample topics; trades and quotes. For each topic, the data is encoded as JSON arrays.

trades:

JSON

Copy
# [time, ticker, price, size]
["2021-10-08 16:16:21", "ABC", 123.45, 10]

quotes:

JSON

Copy
# [time, ticker, bid, bidSize, ask, askSize]
["2021-10-08 16:16:21", "ABC", 123.45, 10, 124.56, 20]

If desired, column names can also be sent along with the data when performance is not critical by setting the KX_INCLUDE_JSON_COLUMNS variable to "true":

trades:

JSON

Copy
[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "price": 123.45, "size": 10}]

quotes:

JSON

Copy
[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "bid": 123.45, "bid_size": 10, "ask": 124.56, "ask_size": 20}]

Note

Set KX_INCLUDE_JSON_COLUMNS when using data with the kdb Insights Enterprise Web Interface.

If ingesting this data using the kdb Insights Enterprise UI, make sure to enable column names by setting KX_INCLUDE_JSON_COLUMNS="true".

Producer configuration

The producer exposes configuration parameters with environment variables. The table below outlines the variables that can be used to configure the producer.

variable

description

KX_INCLUDE_JSON_COLUMNS

Include column names when publishing JSON data (default 'false').

KX_KAFKA_BROKERS

The host and port of the Kafka broker to connect to.

KX_KAFKA_ACKS

Indicates if the producer should wait for acknowledgements before sending more data (default '0').

KX_KAFKA_RETRIES

Indicates if failed messages should be retried or dropped (default '0').

KX_KAFKA_WAIT_TIME

Number of milliseconds to wait between publishes (default '1000'). Increase to reduce throughput.

KX_KAFKA_SAMPLE_ITERATIONS

Number of publish iterations to sample for performance before logging (default '10').

KX_KAFKA_TRUSTSTORE_LOCATION

Path to TLS truststore.jks file when using TLS encryption.

KX_KAFKA_KEYSTORE_LOCATION

Path to TLS keystore.jks file when using TLS encryption.

KX_KAFKA_USE_SSL

If TLS/SSL encryption should be used for communication (default 'false').

KX_KAFKA_CERTIFICATE_PASSWORD

Password for TLS certificate if used.

KX_KAFKA_SSL_CERTIFICATE_VERIFICATION

Indicates if TLS/SSL certificate verification should be used (default 'true').

Containerized Kafka producer

With the Kafka release downloaded to a folder titled kafka and market-generator.jar in the same folder, the following Dockerfile configuration will run the producer in a container.

Docker

Copy
FROM openjdk:17-jdk-alpine3.13
WORKDIR /opt/kx
COPY kafka kafka
COPY market-generator.jar market-generator.jar
CMD ["sh", "-c", "java -cp market-generator.jar:kafka/libs/* MarketProducer"]

Deploying the producer

Docker

Kubernetes

To add the producer to the Docker deployment, add a producer service to the configuration that uses the Dockerfile described above

YAML

Copy
services:
    .. # Indicates excerpt from from previous `docker-compose.yaml` example
  producer:
    build:
      context: .
      dockerfile: Dockerfile
    networks:
      - app
    depends_on:
      - kafka
    environment:
      - KX_KAFKA_BROKERS=kafka:9092

To deploy the producer to a Kubernetes pod, the Dockerfile above needs to be built and published to a container registry so it can be pulled into the Kubernetes cluster. In this example we push an example Docker Hub registry called myorg.

Bash

Copy
docker build --tag myorg/kx-kafka-producer:1.0.0 .
docker push myorg/kx-kafka-producer:1.0.0

Create the following producer.yaml deployment configuration. Remember to replace myorg with the repository used in the command above.

YAML

Copy
apiVersion: v1
kind: Pod
metadata:
  name: kx-kafka-producer
  labels:
    app.kubernetes.io/name: Kafka_Producer
    app.kubernetes.io/component: kx-kafka-producer
spec:
  containers:
  - name: producer
    image: myorg/kx-kafka-producer:1.0.0
    env:
    - name: KX_KAFKA_BROKERS
      value: "kafka:9092"

Deploy the pod by applying the pod configuration with kubectl.

Bash

Copy
kubectl apply -f producer.yaml

Note

Uninstalling the producer

To remove the producer deployment, delete the pod configuration with kubectl

Bash

Copy
kubectl 
delete 
pod 
kx-kafka-producer

 

Next Steps

Now that a Kafka broker and producer has been deployed, follow the guide to connect Kafka with the kdb Insights Stream Processor.