Machine Learning Clustering on Kafka ingest

This page provides an example of fitting and applying a clustering model on a Kafka stream.

Motivation

The purpose of this example is provide you with an example workflow showing an online machine learning model fit on the first N records within a stream and subsequently is using this model for prediction and online updates. In this example we are making use of an streaming implementation of K-Means clustering to filter out one of the generated clusters for persistance to the database.

Example

This example closely follows the Kafka Ingest example provided. This example walks through an example use of functionality provided with the SP ML kdb Insights integration by:

  1. Deploying a sequential K-Means model to be fit on the first 1000 records provided

  2. Applying the fit model to subsequent data to make predictions

  3. Filter the stream based on these predictions and only publish data that meets specified conditions.

This is achieved through filtering the incoming Kafka stream for the trades topic produced using the example Producer. This stream is decoded and windowed before the above analysis is applied.

To run an ingestion pipeline using the kdb Insights Enterprise, first deploy the base system. Next, download the sample Kafka package as described here. This package needs to be modified to deploy a q and Python pipeline containing ML functionality as follows:

View the unpacked package.

bash

Copy
export PKG=kafka-ingest
kxi package unpack $PKG-$KX_VERSION.kxi

The default pipeline spec is written in q and is located under kafka-ingest/src/ml.q in the unpacked package. The Python pipeline spec is located under kafka-ingest/src/ml.py.

Deploying

Authenticate with kdb Insights Enterprise and deploy the sample package.

Note

Make sure to teardown any previously installed version of the package and clean up resources before deploying a new version.

Once deployed, the pipeline starts to ingest data and write to the database.

q

Python

Contents - section 1

Switch the pipeline to use the q ML spec by replacing the values of base and spec in kafka-ingest/pipelines/subway.yaml.

YAML

Copy
base: q-ml
..
src: src/ml.q

Then deploy the package using the commands below

bash

Copy
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG

Switch the pipeline to use the Python ML spec by replacing the values of base and spec in kafka-ingest/pipelines/subway.yaml.

YAML

Copy
base: py-ml
..
src: src/ml.py

Then deploy the package using the commands below

bash

Copy
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG

Checking progress

To check progress of the ingest and validate success, you should follow the steps described here.

Teardown

Tear down the package and data using the command below.

bash

Copy
kxi pm teardown --rm-data $PKG

Further Reading