Kafka with TLS
This page explains how to deploy a Kafka reader with TLS certificates.
Refer to the Example Overview for details about this example.
The TLS setup page showed how to create the keys and certificates for configuring the broker and SP client. The next step is to deploy Kafka and a Stream Processor pipeline.
Docker
Kubernetes
To read from a topic trades
using TLS, the certificates above can be mounted into a
Worker and loaded by the Stream Processor. The following is an example program specification
that could be used to ingest from Kafka.
Example program called spec.q
:
q
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.ca.location ; "/certs/ca-cert.pem");
(`ssl.certificate.location ; "/certs/cert.pem");
(`ssl.key.location ; "/certs/key.pem");
(`ssl.key.password ; "iamsecure"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Example docker-compose.yml
:
YAML
version: "3.6"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_TLS_TYPE=JKS
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
- KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
- KAFKA_CERTIFICATE_PASSWORD=iamsecure
volumes:
- ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
controller:
image: portal.dl.kx.com/kxi-sp-controller:1.14.0
environment:
- KDB_LICENSE_B64
command: ["-p", "6000"]
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.14.0
environment:
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
- KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
- KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
- KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
- KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
volumes:
- .:/opt/kx/app/data
command: ["-p", "5000"]
To read a topic trades
using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs
.
Bash
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
Bash
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
q
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
(`ssl.key.location ; .qsp.configPath["kafka-certs"],"/tls.key");
(`ssl.key.password ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
Bash
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
}' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root is not installed in the worker image.
This results in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES
must be set to "false"
. This is not recommended
for production installations.
This can be added to the deployment request under an env
field.
Bash
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
env : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
}' | jq -asR .)"