Configuration

Configure the Stream Processor for deployment in either Docker or Kubernetes

As a microservice, the Stream Processor can be deployed using Docker or Kubernetes. Configuration is dependent on the deployment environment.

Within Docker, Controller and Worker containers must be configured and managed manually, either with Docker, Docker Compose, or a similar container management system.

Within Kubernetes, deployment and installation is greatly simplified through a Helm chart which installs the Stream Processor Coordinator (a Kubernetes Operator) into a cluster which accepts streaming jobs through a REST API. Being an Operator, the Coordinator will configure and launch all required Kubernetes workloads and resources.

Docker

Running with different configurations within Docker

User code for streaming pipelines can be either

  • embedded into the YAML configuration

  • mounted in a directory, setting environment variable KXI_SP_SPEC_PATH as the path to the .q or .py file that defines the pipeline

YAML configuration

Each field of the YAML configuration can be overridden by environment variables. (If all are overridden, the YAML configuration file itself is not needed.)

If present, the configuration must contain an id field as well as a settings field, itself containing a minWorkers field.

id: pipeline-id # A legible way to identify the pipeline to Service Discovery
 # (default: code generated id with prefix "pipeline-")
 # (env var override: KXI_SP_ID)
settings:
 minWorkers: n # The minumum number of worker processes
 # needed to be online to start the pipeline
 # (default: 1)
 # (env var override: KXI_SP_MIN_WORKERS)

Environment variables

Controller

The Controller can be configured with the following optional environment variables:

Environment Variable

Default

Description

KXI_SP_ID

See description

legible identifier; defaults to pipeline-[0-9a-f]{10}

KXI_SP_CONFIG

""

path to YAML configuration file, if used

KXI_SP_CHECKPOINT_FREQ

"5000"

frequency of checkpoints (ms), 0 disables

KXI_SP_SILENCE_CHECKPOINTS

"false"

silence warnings when checkpointing disabled

KXI_SP_DISABLE_METRICS

"true"

disable metrics reporting (can reduce incurred latency)

KXI_SP_DISABLE_REST

"false"

disable REST interface

KXI_SP_MIN_WORKERS

"1"

minimum number of Workers required to be online to start

KXI_CONFIG_FILE

""

path to the sidecar configuration (if used)

KXI_ERROR_MODE

"0"

error trap mode

KXI_SP_BETA_FEATURES

"false"

enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release

Worker

The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique ID.

environment variable

default

description

KXI_SP_PARENT_HOST

Required

host:port for this Worker's Controller

KXI_SP_SPEC

Required

path to the the pipeline spec file (python or q)

KXI_SP_ORDINAL

""

integer ordinal for the Worker, each Worker sharing a Controller must have a different ordinal

KXI_SP_CHECKPOINT_FREQ

"5000"

frequency of checkpoints (ms), 0 disables

KXI_SP_SILENCE_CHECKPOINTS

"false"

silence warnings when checkpointing disabled

KXI_SP_REPORTING_FREQ

"5000"

frequency of heartbeats (ms) from Worker to Controller

KXI_SP_DISABLE_METRICS

"true"

disable metrics reporting (can reduce incurred latency)

KXI_SP_DISABLE_REST

"false"

disable REST interface

KXI_SP_EVENT_JOURNAL (Beta)

"true"

enables event journaling, which is used by some readers for replay

KXI_SP_JOURNAL_DIR (Beta)

""

path to the directory where event journaling logs should be stored

KXI_RT_LIB

""

path to mounted kdb Insights Reliable Transport client

KXI_RT_EVENT_FATAL

"false"

set to "true" to indicate that any data loss events from RT should cause the worker to error fatally

KXI_CONFIG_FILE

""

path to the sidecar configuration (if used)

KXI_PROTECTED_EXECUTION

"true"

log names of failing operators in errors

KXI_ERROR_MODE

"0"

error trap mode

KXI_SP_CONFIG_PATH

"./config/sp"

path to custom mounted configuration maps and secrets specified in a deployment request (optional, only used in Kubernetes deployments)

KXI_SP_BETA_FEATURES

"false"

enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release

KXI_ALLOW_NONDETERMINISM

"false"

for pipeline workloads that have non-deterministic workloads, enabling non-determinism will turn off deduplication and let any data flow. See determinism for more details.

KXI_SP_SET_BOUNDED

""

for pipelines which make use of unbounded readers, this environment variable allows all readers to be considered as bounded readers. Useful for the case where a user is intending to manually finish a pipeline which uses unbounded readers for batch ingest scenarios. Set to "true" to enable this behavior.

KXI_SP_PARENT_RETRIES

60

Number of times to retry connecting to the Controller.

KXI_SP_PARENT_RETRY_WAIT

00:00:03

Time to wait between Controller connection attempts.

KXI_SP_PARENT_OPEN_TIMEOUT

00:00:00.500

hopen timeout for each Controller connection attempt. If this time is exceeded a retry will be triggered.

KXI_SP_RECONNECT_FREQ

00:00:05

Interval between worker attempts to reconnect to the controller after becoming disconnected.

KXI_SP_RECONNECT_TIMEOUT

00:05:00

How long to wait after disconnecting from the controller before giving up and restarting the worker. After disconnecting, the worker will repeatedly attempt to reconnect to the controller.

KXI_SP_OP_RETRIES

60

Number of times a pipeline operator will retry connecting to a process (if it has a requirement to connect to one). It should be noted that this environment variable will be ignored for operators that have retries as a configuration option.

KXI_SP_OP_RETRY_WAIT

00:00:03

Time to wait between process connection attempts within an operator. It should be noted that this environment variable will be ignored for operators that have retryWait as a configuration option.

KXI_SP_OP_OPEN_TIMEOUT

00:00:00.500

hopen timeout for each process connection attempt within an operator. If this time is exceeded a retry will be triggered.

Monitoring

Containers within a Stream Processor cluster can report pipeline-specific metrics (throughput and latency) and kdb+-specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.

If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.

Tip

Monitoring multiple Workers

Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.

To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar container to scrape the Controller.

services:
 ..
 controller-sidecar:
 image: portal.dl.kx.com/kxi-sidecar:0.8.0
 environment:
 - KXI_LOG_CONFIG=/opt/config/qlog.json
 - KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
 - KDB_LICENSE_B64
 volumes:
 - ./config:/opt/config
 command: -p 8080 # Standard is to expose metrics on port 8080

Create the configuration directory containing the sidecar controller-sidecar.json configuration file.

{
 "connection": ":controller:6000",
 "frequencySecs": 5,
 "metrics":
 {
 "enabled":"true",
 "frequency": 5,
 "handler": {
 "pc": true,
 "pg": true,
 "ph": true,
 "po": true,
 "pp": true,
 "ps": true,
 "ts": true,
 "wc": true,
 "wo": true,
 "ws": true
 }
 }
}

The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency. It tracks statistics on many of the q event handlers in the .z namespace.

Adding Prometheus to a Docker Compose

As an example of adding monitoring configuration, we demonstrate Prometheus using the kdb Insights Monitoring component.

Prometheus is available for download from Docker Hub and can be pulled with docker pull. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000.

services:
 prometheus:
 image: prom/prometheus
 ports:
 - 9000:9090
 volumes:
 - ./config:/etc/prometheus
 - prometheus-data:/prometheus
 command: --config.file=/etc/prometheus/prometheus.yml
volumes:
 prometheus-data:

The volume prometheus-data:/prometheus is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command.

This configuration file contains scrape targets, which are the container sidecars we previously configured. In the example below we set the scrape interval and timeout values and specify multiple scrape_config jobs to scrape data from different sidecar containers in a cluster.

global:
 scrape_interval: 10s
 scrape_timeout: 5s
scrape_configs:
 - job_name: controller
 metrics_path: /metrics
 static_configs:
 - targets:
 - 'controller-sidecar:8080' # Point to controller sidecar
 - job_name: worker
 metrics_path: /metrics
 static_configs:
 - targets:
 - 'worker-sidecar:8080' # Point to a worker-sidecar, if exists

Kubernetes

A Helm chart has been provided for installing the Stream Processor service into a Kubernetes cluster.

Prerequisites

To run within Kubernetes there are a number of pre-requisites:

Kdb+ License

To start the Stream Processor service within Kubernetes, a license secret should be installed into the Kubernetes cluster.

kubectl create secret generic kxi-license --from-file=license=${QLIC}/k4.lic

Helm

Add the KX Helm chart repository to your helm install:

helm repo add --username <username> --password <password> \
 kxi-repo https://nexus.dl.kx.com/repository/kx-insights-charts/
"kxi-repo" has been added to your repositories

Update your helm repository local chart cache:

helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "kxi-repo" chart repository
Update Complete. ⎈Happy Helming!⎈

Add a secret to your cluster to allow access to the images:

kubectl create secret docker-registry kx-repo-access \
 --docker-username=<username> \
 --docker-password=<password> \
 --docker-server=portal.dl.kx.com

Note

In the case above the secret kx-repo-access has been created. When deploying the charts it will be necessary to reference this secret in the imagePullSecrets within the configuration file.

Configuration file

A configuration file is required to run the Coordinator service. An example standalone configuration file is provided below. See all configuration options for more information and options.

cat spcoord.yaml
license:
 secretName: kxi-license
imagePullSecrets:
 - name: kx-repo-access
persistence:
 enabled: true
 # You will need to provide this argument if persistence is enabled.
 # Point this to the name of a created Storage Class for checkpoints.
 storageClassName: standard
 controllerCheckpointFreq: 10000
 workerCheckpointFreq: 5000
 # Storage allocated to each worker/controller
 storage: 20Gi
discovery:
 # If KXI Service Discovery is set up, set this to true
 enabled: false
metrics:
 # If KXI Monitoring is set up, set this to true
 enabled: false
logging:
 endpoints: # list of endpoints (can provide multiple)
 - fd://stdout
 formatMode: json
 routings: # Mapping of logging to level
 ALL: DEBUG # `ALL` is the default routing
 spcoord: TRACE # Set coordinator logs to trace
 KUBE: TRACE # Set kubernetes logs to trace

Installing the Coordinator

helm install sp kxi-repo/kxi-sp -f path/to/spcoord.yml

One kxi-sp install per namespace

Only 1 Coordinator may be installed in a given Kubernetes namespace. If you wish to install another, it must be done in another namespace.

Accessing the Coordinator

To access the Coordinator for the following creation or teardown requests, first port-forward the Coordinator service locally.

kubectl port-forward sp-kxi-sp-0

Configuring and running a streaming job

Assuming a spec.q file exists locally with a pipeline defined, it's configuration is provided through the REST API when submitting it to the Coordinator.

curl -X POST http://localhost:5000/pipeline/create -d \
 "$(jq -n --arg spec "$(cat spec.q)" \
 '{
        name     : "kafka-in",
        type     : "spec",
        base     : "q",
        config   : { content: $spec },
        settings : {
            minWorkers: "1",
            maxWorkers: "10",
            workerThreads: "4",
            workerImage: ""
        },
        kubeConfig: { "secrets" : ["aws-creds"]},
        persistence : {
          controller : { class:"standard", size:"25Gi", checkpointFreq: 5000 },
          worker     : { class:"standard", size:"100Gi", checkpointFreq: 1000 }
        }
    }' | jq -asR .)"

In the above REST API, the following settings are available as part of the settings field.

setting

default

description

minWorkers

1

Minimum required Workers ready for the pipeline to start

maxWorkers

10

Maximum required Workers ready for the pipeline to start

workerThreads

0

Number of secondary threads for Workers

workerImage

Custom Worker image to deploy (if building a custom Worker from the .qpk)

Configuring Persistence for a streaming job

Persistence configuration can be customized for each deployment of a streaming job. It is configured with the persistence field which is composed of a map of two fields, controller and worker, used respectively to configure persistence and checkpointing for the Controller and Worker. For each process type, the Kubernetes storage class, storage size and checkpoint frequency can be configured.

setting

default

description

disabled

false

Flag indicating whether to disable persistence for the process type

class

Kubernetes cluster default

Kubernetes storage class used to provision persistent disk. Default value can be specified with Helm chart (persistence.storageClass)

size

20Gi

Storage size allocated for the persistent volume. Default value can be specified with Helm chart (persistence.storage)

checkpointFreq

5000

Checkpointing frequency of process type (in milliseconds). Default value can be specified with Helm chart (persistence.controllerCheckpointFreq & persistence.workerCheckpointFreq)

Persistence can be disabled per job by explicitly setting the disabled field to true for the Controller and or Worker, as shown below:

curl -X POST http://localhost:5000/pipeline/create -d \
 "$(jq -n --arg spec "$(cat $spec)" \
 '{
        name        : "no-persistence",
        type        : "spec",
        config      : { content: $spec },
        settings    : { minWorkers: "1", maxWorkers: "10" },
        persistence : { controller: { disabled: true }, worker: { disabled: true }}
    }' | jq -asR .)"

Note: If persistence is disabled by default by the helm charts, no persistent disk will be deployed to back checkpoints unless all the necessary fields are provided in the deployment request (class, size and checkpoint frequency).

Increasing the size of an existing persistent volume

Persistence for the SP in Kubernetes is accomplished using PersistentVolumeClaims (PVCs), which can have their size expanded after creation. To resize a PVC, the PVC's StorageClass must be set to allow volume expansion. If the StorageClass is correctly configured, and the PVC is of a type that supports volume expansion, you can expand the size of the persistent volume by editing the PVC to have a larger size. This can be done using a tool such as kubectl or k9s. For a more detailed explanation of the process of expanding PVCs, see the official Kubernetes documentation on Expanding Persistent Volume Claims.

Worker pipelines can be found using label selectors. The label selectors you'll need to use to find all Worker PVCs for a specific pipeline are sp.kx.com/pipeline=<pipeline ID> and sp.kx.com/role=worker.

For example, to list the Worker PVCs for a pipeline with ID spcoord-my-pipeline, you could use the following kubectl command:

kubectl get pvc -l sp.kx.com/pipeline=spcoord-my-pipeline,sp.kx.com/role=worker

Configuring mounting of custom configuration for a streaming job

In a Kubernetes deployment of the Stream Processor, configuration files that are required for Workers to execute a streaming job (e.g. credentials) can be mounted with the kubeConfig field of a deployment request. These files must be embedded into a Kubernetes Secret or ConfigMap prior to creation of the pipeline in the same namespace as the Stream Processor's deployment.

For more information about how to create a secret or configuration, please refer to the links below:

https://kubernetes.io/docs/concepts/configuration/configmap/

https://kubernetes.io/docs/concepts/configuration/secret/

The kubeConfig field is an optional map that contains two fields secrets and configMaps which both contain an array of secret and configMaps names that are to be mounted in the Worker executing the streaming job.

Deploying a streaming job from a Python specification

Streaming pipeline specifications, can be written in Python or q. By default if no base parameter is provided in the deployment request, it is assumed that the submitted spec is written in q. However if a user has written a pipeline spec using the SP python API, the user can set the base deployment parameter to "py" as shown in the example below:

curl -X POST http://localhost:5000/pipeline/create -d \
 "$(jq -n --arg spec "$(cat test.py)" \
 '{
        name     : "test",
        type     : "spec",
        base     : "py",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" }
    }' | jq -asR .)"

Where test.py is the following:

from kxi import sp
sp.run(sp.read.from_callback('publish').to_console())

Deploying a streaming job with access to Machine Learning (ML) functionality

In addition to the use of the plugins defined in the API section here, additional ML functionality can be used through deployment of an q-ml base image. The full set of available ML functions is defined here. Presently the ML functionality can only be deployed using a streaming pipeline specification written in q, the following outlines a sample deployment of a q-ml base image:

curl -X POST http://localhost:5000/pipeline/create -d \
 "$(jq -n --arg spec "$(cat spec.q)" \
 '{
        name     : "test",
        type     : "spec",
        base     : "q-ml",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" }
    }' | jq -asR .)"

Where spec.q is the following:

.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.ml.minMaxScaler[::;.qsp.use ``bufferSize!(`;100)]
 .qsp.ml.sequentialKMeans[{select from x};.qsp.use ``k!(`;3)]
 .qsp.write.toConsole[]

Uninstalling the Stream Processor Coordinator

To remove the Stream Processor service from your Kubernetes cluster:

1) Teardown all running pipelines

Submit the teardown request to the Coordinator to bring down all pipelines.

curl -X POST localhost:5000/pipelines/teardown

Alternatively, pipelines can be individually removed with:

curl -X POST localhost:5000/pipeline/teardown/<id>

2) Uninstall the Helm chart

helm uninstall sp

Additionally, all Stream Processor workloads will have the following labels to help identify them:

app.kubernetes.io/part-of=stream-processor

All configuration options

The following options are available when configuring the Coordinator service when deploying with Helm.

option

default

description

image.repository

portal.dl.kx.com

The URL of the image repository for the coordinator image.

image.component

kxi-sp-coordinator

The name of the coordinator image.

image.pullPolicy

IfNotPresent

The Kubernetes image pull policy for this image.

ctlImage.repository

portal.dl.kx.com

The URL of the image repository for the default controller image.

ctlImage.component

kxi-sp-controller

The name of the controller image.

ctlImage.pullPolicy

IfNotPresent

The Kubernetes image pull policy for this image.

workImage.repository

portal.dl.kx.com

The URL of the image repository for the default worker image.

workImage.component

kxi-sp-worker

The name of the worker image.

workImage.pullPolicy

IfNotPresent

The Kubernetes image pull policy for this image.

mlImage.repository

portal.dl.kx.com

The URL of the image repository for the default machine learning worker image.

mlImage.component

kxi-ml

The name of the machine learning worker image.

mlImage.pullPolicy

IfNotPresent

The Kubernetes image pull policy for this image.

pyImage.repository

portal.dl.kx.com

The URL of the image repository for the default Python worker image.

pyImage.component

kxi-sp-python

The name of the Python worker image.

pyImage.pullPolicy

IfNotPresent

The Kubernetes image pull policy for this image.

imagePullSecrets

[]

Arrays of name of secrets with image pull permissions.

env

{}

Additional environment variables to add to the coordinator.

debug

false

Enables interactivity for the coordinator.

port

5000

The port that the coordinator will bind to and serve its REST interface from.

instanceParam

{ "g": 1, "t": 1000 }

Command line parameters to pass to the coordinator. See command line parameters for details.

defaultWorkerThreads

0

Default secondary threads for new pipeline submissions.

betaFeatures

false

Enables optional beta features in a preview mode. Beta features are not intended to be used in production and are subject to change.

auth.enabled

false

Indicates if authentication should be enabled for the coordinator's REST interface.

persistence.enabled

true

Whether persistent volumes are enabled on pipelines. Note: checkpointing for recovery requires this be enabled

persistence.storageClassName

null

Pre-configured storage class name to be used for persistent volumes (if not specified will use the Kubernetes cluster's default storage class)

persistence.controllerCheckpointFreq

5000

Frequency of Controller checkpoints

persistence.workerCheckpointFreq

5000

Frequency of Worker checkpoints

persistence.storage

20Gi

Persistent volume storage size

autoscaling.enabled

false

Indicates if the coordinator should automatically scale based on load.

autoscaling.minReplicas

1

The minimum number of coordinator replicas that should be running.

autoscaling.maxReplicas

1

The maximum number of coordinator replicas that should be running.

autoscaling.targetCPUUtilizationPercentage

80

The maximum amount of CPU a replica should consume before triggering a scale up event.

autoscaling.targetMemoryUtilizationPercentage

The maximum amount of memory a replica should consume before triggering a scale up event.

replicaCount

1

If autoscaling is enabled, this is the baseline number of replicas that should be deployed.

affinity

hard

One of hard, soft, hard-az or soft-az. Hard affinity requires all replicas to be on different nodes. Soft prefers different nodes but does not require it. The az suffix indicates the node allocation must be across different availability zones.

license.onDemand

false

Indicates if a kc.lic will be used.

license.asFile

true

Indicates if the license will be provided as a file, otherwise it must be provided as an environment variable.

license.secretName

Name of secret with base64 license text either as a file if asFile is enabled or as an environment variable.

discovery.enabled

true

Whether kdb Insights Service Discovery is enabled. Note: Service must be set up separately.

metrics.enabled

true

Whether kdb Insights Monitoring is enabled. Note: Service must be set up separately.

metrics.frequency

5

Polling frequency of monitoring metrics in seconds

metrics.handler.po

true

Capture metrics for the .z.po handler

metrics.handler.pc

true

Capture metrics for the .z.pc handler

metrics.handler.wo

true

Capture metrics for the .z.wo handler

metrics.handler.wc

true

Capture metrics for the .z.wc handler

metrics.handler.pg

true

Capture metrics for the .z.pg handler

metrics.handler.ps

true

Capture metrics for the .z.ps handler

metrics.handler.ws

true

Capture metrics for the .z.ws handler

metrics.handler.ph

true

Capture metrics for the .z.ph handler

metrics.handler.pp

true

Capture metrics for the .z.pp handler

metrics.handler.ts

true

Capture metrics for the .z.ts handler

logging.endpoints

[ "fd://stdout" ]

List of endpoints to pass to qlog. Logging will be written to each of the given endpoints

logging.formatMode

"json"

Desired logging format. (e.g. "test", "json")

logging.routings

ALL: INFO, SPCOORD: TRACE

Log routing represented by key value pairs, configuring the logging level of for different facilities (e.g )