Custom Query API Development

This page outlines how you can augment your streaming process with accessible named query APIs.

WARNING

The functionality outlined below provides the necessary tools for users to build complex streaming infrastructures. The generation and management of such workflows rest solely with the users. KX supports only individual elements used to create these workflows, not the end-to-end applications.

The addition and use of custom query APIs is often crucial for making your data accessible to users. Users connected to your process via IPC or by a querying Gateway process call these APIs. You can place custom query APIs on any process type discussed in the Capture and Store, Analyze Streaming Data, and Subscribe to Data pages.

In each case, you can add a query API by calling the register_api method on each of the process types or during the configuration of an RTP , HDB or GATEWAY process in your system. A breakdown of gateway processes follows Manage Query Routing. In the examples below we add query APIs to the historical database created when configuring the basic infrastructure and the RTP processing the aggregate dataset.

Configure an API for your Real-Time Processor

You can add APIs to your process at configuration time or while the process is in operation, to allow an iterative development. The following sections show how both approaches can be achieved to create a Python function which takes multiple parameters:

  1. The table which is being queried

  2. The symbol which a user is interested in

And returns the number of instances of that symbol:

Python

Copy
def custom_api(table, symbol):
    return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]

Add an API to an existing RTP and HDB

Now that you have the function definition, use the register_api function to augment the rtp class created here.

Python

Copy
rtp.register_api('symbol_count', custom_api)

Similarly, you can add the equivalent API to your HDB process generated here by accessing the hdb class as follows:

Python

Copy
basic.hdb.register_api('symbol_count', custom_api)

Add an API when configuring your system

In the previous section you added custom APIs to a running system. To make APIs available on restart, you can add them at the configuration time for the processes. For instance, let's modify the example here to include an API.

If we're adding an API at configuration, it's supplied as a dictionary mapping the name of the API to the API code:

Python

Copy
def preprocessor(table, data):
    if table == 'trade':
        return data
    else:
        return None

def postprocessor(table, data):
    agg = kx.q[table].select(
        columns = {'min_px':'min price',
                   'max_px': 'max price',
                   'spread_px': 'max[price] - min price'},
         by = {'symbol': 'symbol'})
    kx.q['agg'] = agg # Make the table accessible from q
    with kx.SyncQConnection(port=5010, wait=False, no_ctx=True) as q:
        q('.u.upd', 'aggregate', agg._values)
    return None

def custom_api(table, symbol):
    return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]

rtp = kx.tick.RTP(port=5014,
                  subscriptions = ['trade'],
                  libraries={'kx': 'pykx'},
                  pre_processor=preprocessor,
                  post_processor=postprocessor,
                  apis={'symbol_count': custom_api},
                  vanilla=False)
rtp.start({'tickerplant': 'localhost:5013'})

Currently we don't support the addition of APIs to the components of the basic infrastructure at startup. To configure a historical database at startup with more fine-grained control, configure it manually as outlined here.

Test an API

In the above we are defining that users calling this function will do so by making use of the named function symbol_count. You can directly test this once registered, as it follows:

Python

Copy
rtp('symbol_count', 'trade', 'AAPL')

Alternatively, you can test this using IPC:

Python

Copy
with kx.SyncQConnection(port=5014, no_ctx=True) as q:
    q('symbol_count', 'trade', 'AAPL')

Next steps

Now that you have data being published to your system you may be interested in the following:

For some further reading, here are some related topics: