Stream Processor APIs

This page provides information on how to interact with the Stream Processor.

The Stream Processor has two supported programming interfaces for writing streaming applications, one in q and one in Python.

To interact with the Stream Processor at runtime, refer to the OpenAPI REST interface.

  • Coordinator OpenAPI This is the primary interface for interacting with the Stream Processor. Use the Coordinator interface to deploy and teardown pipelines. This interface is available using a Kubernetes service name <release>-kxi-sp:5000.

  • Controller OpenAPI The controller interface offers localized feedback on a single pipeline. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.

  • Worker OpenAPI The worker interface allows for introspection of the runtime state of a given worker. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.

Warning

The Stream Processor prior to 1.4.0 kdb Insights and kdb Insights Enterprise supported the creation of Stream Processors with many-to-one connections for configurations where this was not supported. This has been removed as support for this feature causes issues with determinism in pipelines and should be avoided. To facilitate scenarios where many-to-one connections are required use merge nodes to connect data into a single connection.

Deprecation Policy

Code that is marked as deprecated is subject to be removed in the next major release. Deprecated APIs will be maintained for all minor versions of a release but will print a warning indicating that they are deprecated. All deprecated APIs will indicate a path for upgrade in the relevant documentation. See the release notes for details on any deprecated or removed APIs.

Available APIs

Here are the categories of APIs

Category

Summary Usage

Configuring operators

Pass optional arguements into APIs

General

Manage various pipeline behaviours

Lifecycle

Manage actions upon various pipeline events

Operators

Define desired data processing in pipeline

Readers

Configure entry method of data into pipeline

Decoders

Use certain decoders against data

Encoders

Apply certain encoders to data

Transform

Apply some pre-defined data transformation methods

Stats

Compute certain statistical meaures

State

Get or set the state of a pipeline operator

String Utilities

Apply certain processing methods to strings

Windows

Divide data into overlapping or non-overlapping buckets

Writers

Configure storage mehtod of data out of pipeline

User-Defined Functions

Fetch a custom defined function to use in pipeline

Machine Learning

Use Classifiction, Clustering, Regression, Feature Creation, Metrics, Custom (Registry) Models and general ML utility functions

Operator syntax

Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.

APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.

.qsp.run
 .qsp.read.fromKafka[`trades]
 .qsp.decode.json[]
 .qsp.window.tumbling[00:00:05; `time]
 .qsp.write.toStream[]

Implicit last argument

Each .qsp operator returns an object representing a ‘node’ configuration; and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.

Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.

Implicit penultimate argument

Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.

If .qsp.foo is a Pipeline API operator that takes arguments x and y and optionally cco a dictionary of custom configuration options, its true signatures are

.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]

To limit duplication in the documentation, neither the cco nor the node arguments are documented. The signature would be documented as

.qsp.foo[x;y]

An important consequence is that

.qsp.foo[x;y]
.qsp.foo[x;y;cco]

are both unary projections.

Order of evaluation

Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.

In

prd
{x where x>5}
til 8

q evaluates first til 8, then the lambda, then prd, but in

.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]

because actual evaluation is handled by .qsp.run, the table refData is read before its contents are written to the console.

Pipeline operators are designed to be chained together to create a single pipeline. Operators are joined using a pipe | syntax. Each operator has a number of required arguments that can be provided positionally, any optional arguments must use named arguments.

from kxi import sp
from datetime import timedelta
sp.run(sp.read.from_kafka('trades')
| sp.decode.json()
| sp.window.tumbling(timedelta(seconds=5), 'time')
| sp.write.to_stream())