Kafka with TLS

This page explains how to deploy a Kafka reader with TLS certificates.

Refer to the Example Overview for details about this example.

The TLS setup page showed how to create the keys and certificates for configuring the broker and SP client. The next step is to deploy Kafka and a Stream Processor pipeline.

Docker

Kubernetes

To read from a topic trades using TLS, the certificates above can be mounted into a Worker and loaded by the Stream Processor. The following is an example program specification that could be used to ingest from Kafka.

Example program called spec.q:

q

Copy
.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.ca.location          ; "/certs/ca-cert.pem");
            (`ssl.certificate.location ; "/certs/cert.pem");
            (`ssl.key.location         ; "/certs/key.pem");
            (`ssl.key.password         ; "iamsecure"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Example docker-compose.yml:

YAML

Copy
version: "3.6"

services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_TLS_TYPE=JKS
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
      - KAFKA_CERTIFICATE_PASSWORD=iamsecure
    volumes:
      - ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
      - ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
  controller:
    image: portal.dl.kx.com/kxi-sp-controller:1.14.0
    environment:
      - KDB_LICENSE_B64
    command: ["-p", "6000"]
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.14.0
    environment:
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092
      - KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
      - KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
      - KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
      - KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
    volumes:
      - .:/opt/kx/app/data
    command: ["-p", "5000"]

To read a topic trades using TLS, a Kubernetes TLS secret can be used to load the client certificate and key into a Worker.

Create a secret named kafka-certs.

Bash

Copy
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem

Optionally create a secret for the certificate password.

Bash

Copy
kubectl create secret generic kafka-pass --from-literal=password=iamsecure

Example Kafka streaming program.

q

Copy
.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Deploy the program. The Stream Processor Coordinator must be deployed and accessible.

Bash

Copy
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"

Turn off certificate verification for self-signed certificates

When using self-signed certificates the CA root is not installed in the worker image. This results in a certificate verification failure. In this case, KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended for production installations.

This can be added to the deployment request under an env field.

Bash

Copy
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"