Custom Query API Development
This page outlines how you can augment your streaming process with accessible named query APIs.
WARNING
The functionality outlined below provides the necessary tools for users to build complex streaming infrastructures. The generation and management of such workflows rest solely with the users. KX supports only individual elements used to create these workflows, not the end-to-end applications.
The addition and use of custom query APIs is often crucial for making your data accessible to users. Users connected to your process via IPC or by a querying Gateway process call these APIs. You can place custom query APIs on any process type discussed in the Capture and Store, Analyze Streaming Data, and Subscribe to Data pages.
In each case, you can add a query API by calling the register_api
method on each of the process types or during the configuration of an RTP
, HDB
or GATEWAY
process in your system. A breakdown of gateway processes follows Manage Query Routing. In the examples below we add query APIs to the historical database created when configuring the basic infrastructure and the RTP processing the aggregate dataset.
Configure an API for your Real-Time Processor
You can add APIs to your process at configuration time or while the process is in operation, to allow an iterative development. The following sections show how both approaches can be achieved to create a Python function which takes multiple parameters:
-
The
table
which is being queried -
The
symbol
which a user is interested in
And returns the number of instances of that symbol:
Python
def custom_api(table, symbol):
return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]
Add an API to an existing RTP and HDB
Now that you have the function definition, use the register_api
function to augment the rtp
class created here.
Python
rtp.register_api('symbol_count', custom_api)
Similarly, you can add the equivalent API to your HDB
process generated here by accessing the hdb
class as follows:
Python
basic.hdb.register_api('symbol_count', custom_api)
Add an API when configuring your system
In the previous section you added custom APIs to a running system. To make APIs available on restart, you can add them at the configuration time for the processes. For instance, let's modify the example here to include an API.
If we're adding an API at configuration, it's supplied as a dictionary mapping the name of the API to the API code:
Python
def preprocessor(table, data):
if table == 'trade':
return data
else:
return None
def postprocessor(table, data):
agg = kx.q[table].select(
columns = {'min_px':'min price',
'max_px': 'max price',
'spread_px': 'max[price] - min price'},
by = {'symbol': 'symbol'})
kx.q['agg'] = agg # Make the table accessible from q
with kx.SyncQConnection(port=5010, wait=False, no_ctx=True) as q:
q('.u.upd', 'aggregate', agg._values)
return None
def custom_api(table, symbol):
return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]
rtp = kx.tick.RTP(port=5014,
subscriptions = ['trade'],
libraries={'kx': 'pykx'},
pre_processor=preprocessor,
post_processor=postprocessor,
apis={'symbol_count': custom_api},
vanilla=False)
rtp.start({'tickerplant': 'localhost:5013'})
Currently we don't support the addition of APIs to the components of the basic infrastructure at startup. To configure a historical database at startup with more fine-grained control, configure it manually as outlined here.
Test an API
In the above we are defining that users calling this function will do so by making use of the named function symbol_count
. You can directly test this once registered, as it follows:
Python
rtp('symbol_count', 'trade', 'AAPL')
Alternatively, you can test this using IPC:
Python
with kx.SyncQConnection(port=5014, no_ctx=True) as q:
q('symbol_count', 'trade', 'AAPL')
Next steps
Now that you have data being published to your system you may be interested in the following:
-
Generate a query routing gateway to allow queries across multiple processes.
-
Manually configure the basic infrastructure as outlined on the Complex infrastructure page.
For some further reading, here are some related topics:
-
Learn more about Interprocess Communication(IPC).
-
Learn more about how you can query your data.