Using the kdb Insights q interface

The rt.qpk provides APIs for publishing and subscribing to an RT stream.

Internal q publishers and subscribers (such as the Stream Processor and Storage Manager) use the rt.qpk interface (also known as the "RT kdb Insights qpk"), to publish messages to an RT stream and to receive messages from an RT stream. External q publishers can also leverage the rt.qpk to communicate with an RT stream.

Download

To download the rt.qpk follow the steps below.

version=1.12.0
curl -L https://portal.dl.kx.com/assets/raw/rt/$version/rt.$version.qpk -o rt.$version.qpk
unzip rt.$version.qpk
Navigate to the appropriate directory and you can start a q session

$ cd rt/
$ q startq.q
q)params:(`path`stream`publisher_id`cluster)!("/tmp/rt";"data";"pub1";enlist(":127.0.0.1:5002"))
q)p:.rt.pub params
q)show taxi:("SPPHEEEEEEEES";enlist",")0: hsym`$"../config/taxi.csv"
q)p(`.b; `taxi; update pickup:.z.p,dropoff:.z.p+0D00:15:00.0 from taxi)

Publisher

Internal Publisher

Follow the download steps above, then you can publish as follows:

$ cd rt/
$ q startq.q
q)params:`path`stream`publisher_id!("/tmp/rt";"mystream";"pub1")
q)p:.rt.pub params

The returned p is a function to which q objects can be passed for publishing.

q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t)

Advanced

.rt.pub can be called with different dictionary keys to override defaults, for example:

q)t:`stream`path`topic_prefix`port!("stream";"/rt/logs";"rt-";5002)
q)p:.rt.pub t

Description of the dictionary for .rt.pub:

key

type

default

description

stream

string

n/a

RT stream name

path

string

$RT_LOG_PATH

The parent path of RT log directories

topic_prefix

string

$RT_TOPIC_PREFIX

The prefix of the RT sequencer node hostnames in $RT_TOPIC_PREFIX$stream-$n right before $stream.

port

int

5002

The port number of the internal pull servers

publisher_id

string

n/a

Used to distinguish multiple publishers on the same host. The directory name will be $publisher_id.$hostname.$stream.

dedup_id

string

n/a

Used in the sequencer to identify deduplicated input streams. The $stream suffix of the directory name changes to $dedup_id.dedup.

cluster

list of strings

:$RT_TOPIC_PREFIX$stream-$n:$port

It is possible to completely override the logic that creates the connection URLs for the replicators by manually providing the URLs.

External Publisher

An external publisher can send data to RT via an exposed ingress point, such as a set of load balancers. These load balancers need to be discovered, kdb Insights Enterprise relies on a service called the Information Service for this. When queried the Information Service returns the load balancer endpoints that an external publisher can communicate with. There are inbuilt APIs within the rt.qpk that can manage the querying of the Information Service. The user is required to go through some authentication steps, these are covered below.

Pre-requisites

  • Deploy a package. An example package kxi-db can be found here.

  • Make sure that the package is deployed in your kdb Insights Enterprise instance.

  • Ensure you have an authenticated kdb Insights Enterprise client URL.

  • Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the KXI_CONFIG_URL) are accessible.

Parameters

parameter

required

default

description

config_url

Mandatory

none

The URL that this program calls or reads to find the RT endpoint(s) it needs to connect to

path

Mandatory

none

The location that the RT local files are written locally

configUrl

The config_url parameter can be set to KXI_CONFIG_URL to access kdb Insights Enterprise from outside the cluster

q)url:getenv`KXI_CONFIG_URL;
q)params:`config_url`path!(url;"/tmp/rt/")
q)p:.rt.pub params
q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t)

Maximum payload

An error is thrown by RT if the message being sent is too large. This occurs if a publisher attempts to send a message greater in size than 1GB, to be exact the upper limit is 1073741783 bytes. RT cannot support messages greater than 1GB.

The example code below sends a message that is too large and shows the error that is thrown:

q)url:getenv`KXI_CONFIG_URL;
q)params:`config_url`path!(url;"/tmp/rt/")
q)p:.rt.pub params
q)data:(prd 3#1024)#0x00;
q)p data
'message too large

1GB payload limit

Messages that are larger than 1GB need to be broken into smaller ones before they are sent to RT.

Publisher id

A publisher's log directory names must be globally unique. By default the rt.qpk combines the streamid and the host name to provide a unique directory name.

Path must survive reboots

If the hostname is stable (for example, as part of a stateful set) then this path must survive reboots.

However, if you have multiple publishers connected to the same streamid and running on the same host, you also have to provide a publisher id to create unique directory names on the same host. This can be done by setting `publisher_id in the dictionary .rt.pub is called with.

q) p: .rt.pub `stream`publisher_id!("mystream";"pub_a")

Publisher IDs must be unique on a host

There must never be more than one publisher running using the publisher id on the same host at the same time.

Deduplication

To enable deduplication of a stream:

  1. The publisher needs to call .rt.pub with a dictionary and specify a `dedup_id that identifies the deduplicated stream globally

  2. A unique message identifier must be set in .rt.id before publishing each message

  3. .rt.id needs to be monotonically increasing and if multiple identical streams are merged by the same dedup_id all publishers need to use the same .rt.id for the same message.

For example:

q).rt.id:0;
q)t:`topic_prefix`stream`dedup_id!("kx-";"mystream";"trade")
q)pub_fun: .rt.pub t
q)//pub_fn publishes message to the RT with streamid=mystream using deduplication with dedup_id=trade
q)publish:{[p;t;x] .rt.id+:1; p(`upd;`tab; x)}[pub_fun]
q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)publish[`tab; t]

All session streams from different publishers who shared the same dedup_id will be deduplicated. RT keeps track of the high watermark for each dedup_id - where it sees a message with a lower identifier than the watermark then that message is discarded.

Publisher IDs must be unique on a host

Where you want to run multiple publishers on the same host then `publisher_id also should be set differently for each publisher.

The message header

The message header consists of the following fields:

  • on - the origin name, a symbol representing who originated this message.

  • ts - the timestamp the message was recorded. Setting this to 0Np will have the publisher use the current real-time.

  • id - the message identifier, should be unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4 is a valid sequence of ids, but 1 2 -2 4 is not.

  • to - reserved for future use.

The message header can be populated by setting the corresponding variable in the .rt namespace (e.g. .rt.id) before calling the returned function p when publishing a message.

q).rt.id:0;
q)t:`topic_prefix`stream`dedup_id!("kx-";"mystream";"trade")
q)pub_fun: .rt.pub t
q)//pub_fn publishes message to the RT with streamid=mystream using deduplication with dedup_id=trade
q)publish:{[p;t;x] .rt.id+:1; .rt.ts:.z.p; .rt.on:.z.h; p(`upd;`tab; x)}[pub_fun]
q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)publish[`tab; t]

RT Topic Filtering

The rt.qpk allows subscribers to receive a subset of the data on the RT stream. However, in order to leverage this publishers need to follow a specific message format when publishing data. The message structure would be a 3 item list:

name

example

description

msg_type

upd

Allow users distinguish between messages publisher

table

trade

The table that the data relates to. Users could subscribe to this data only in the stream

payload

([]time; sym; prize; size; trade_id)

Typically the payload would be a kdb+ table

RT is message format agnostic. Data sent to RT does not have to follow this message structure, but when it does, subscribers can subscribe to a subset of the data

Examples:

q)p:.rt.pub (enlist`stream)!enlist "streamid";
q)p 1010101b // topic filtering not available as the message is not a 3 item list
q)p(`upd; `trade; `a`b!1 2) // topic filtering available

Subscriber

The rt.qpk surfaces messages to the client subscriber as they are available. It does this by using file-system events, responding to their changes and surfacing those changes to the subscriber.

Subscriber Callback

The subscriber needs to define a callback function when subscribing to a RT stream. This function is called when a new message is passed to the RT subscriber.

callback:{[data;pos] show data }
s:.rt.sub `stream`position`callback`filter!("streamid"; pos; callback;"");

The callback arguments provide deserialized q messages and a "position". The "position" represents the last message the subscriber received. This can then be used for restarting the subscriber from this point.

message:([]msg:();pos:());
next_positions:([]pos:"j"$());
callback:{
 `message upsert `msg`pos!(x;y);
 `next_positions upsert .rt.p.next_pos[]];

Subscriber Position

RT has a more flexible method of subscription than a kdb+ tickerplant. You can index into the stream and subscribe from a specific point. When RT passes a message to the subscriber, the message includes the payload and the position in the RT stream for that payload. It is useful to cache this position so that if your subscriber restarts, it can resubscribe from the position in the log file it last received data for.

When subscribing you can decide where in the stream to start receiving messages from:

  1. The first message available i.e. the oldest message that has not yet been archived.

  2. A particular "position" it has been previously given by RT. This allows subscribers to restart from their last known position.

  3. A position of the message that was most recently made available to the subscriber. This allows subscribers to ignore all historic messages and only subscribe to new messages. Subscribers can cache this position and when restarting, resubscribe from this position.

The position is not a message index

The "position" is opaque and cannot be used as a message index.

message:([]msg:();pos:());
callback:{[data;pos] 
 upsert[`message] `msg`pos!(data;pos);
 }
pos:(::); // subscribe to the beginning of the stream
pos:52779779358720; // subscribe from a known position in the stream
pos:`latest; // subscribe to the latest position in the RT stream
.rt.sub `stream`position`callback!("streamid"; pos; callback);

You can use API's that return position information about a RT stream.

// Return the latest position in the RT stream
q)latest_position:.rt.get_latest_position enlist[`stream]!enlist"streamid";
q).rt.get_latest_position `stream`cluster!("streamid";enlist ":localhost:5001")
774056190947645
q).rt.get_latest_position `stream`peers!("streamid";enlist ":localhost:5001")
774056190947645
q)
// Return the next position(expected next position in the stream to subscribe)
q)next_position:.rt.p.next_pos[]
q)
// Given a path to a set of RT logs, return the next position (expected next position in the stream to subscribe)
q)next_position:.rt.get_next_position["/rt/logs/"];

The .rt.get_latest_position function supports the dictionary input as well.

The input dictionary must contain the stream and optionally the cluster parameter.

key

type

required

default

description

stream

string

Mandatory

None

RT stream.

cluster

string

Optional

Created by RT

KXI-RT cluster.

peers

string

Optional

Created by RT

KXI-RT cluster.

Subscribe

The subscription API is .rt.sub. You can subscribe to the full stream of data or a subset of the data by using topic filtering.

  1. Subscribe to the full stream of data This will mean you get all the data from the stream 'mystream'

    upd:{[data;pos]}
    .rt.sub`stream`path`position`callback!("mystream";"/rt/logs/";0;upd)
    

  2. Subscribe to one topic within the stream This will mean you get data from the trade topic within the stream.

    upd:{[data;pos]}
    s:.rt.sub`stream`position`callback`filter!("mystream";0;upd;"trade")
    

Topic Filtering

The publisher section covered the message format which must be used to leverage topic filtering. When using topic filtering, you do not have log files replicated from the RT server to the client. Therefore, you do not need to include a path key in the parameters. You can also receive all data from RT with topic filtering method of subscribing. You can do this by including the filter key and leaving it blank. This topic filtering is done server side.

upd:{[data;pos]}
s:.rt.sub`stream`position`callback`filter!("mystream";0;upd;"trade") // subscribe to the 'trade' topic within the 'mystream' stream
s:.rt.sub`stream`position`callback`filter!("mystream";0;upd;"") // subscribe to all data within the 'mystream' stream

Subscriber session with topic filtering

When using topic filtering, it is important that the result returned from .rt.sub is stored in a variable. Otherwise the connection will close and no data will be received

Subscription Inputs

Description of the input paramaeters for .rt.sub:

  • All subscription options must contain the position and callback parameters as position and callback.

    key

    type

    required

    default

    description

    position

    int

    Mandatory

    None

    RT stream position.

    callback

    function

    Mandatory

    None

    RT stream callback function.

  • There are different combinations to include in the dictionary:

    1. Create the config_url from input parameters:

    In this case, the subscriber will connect to --: e.g rt-mysteam-0:5001

    | key          | type            | required  | default            | description                                          |
    |--------------|-----------------|-----------|--------------------|------------------------------------------------------|
    | stream       | string          | Mandatory | None               | RT stream name.                                      |
    | path         | string          | Optional  | `$RT_LOG_PATH`     | The path of RT log directory.                        |
    | port         | int             | Optional  | `5001`             | The port number the pull clients start on.           |
    | topic_prefix | string          | Optional  | `$RT_TOPIC_PREFIX` | RT topic prefix name.                                |
    | filter       | string          | Optional  | None               | The subset of data in the RT stream to subscribe to  |
    
    1. Use a pre-defined cluster as cluster input parameter. Applying input cluster overrides the logic that creates the connection URLs for the replicators by manually providing the URLs.

      key

      type

      required

      default

      description

      stream

      string

      Mandatory

      None

      RT stream name.

      cluster

      list of strings

      Optional

      created by RT

      The connection URL.

      path

      string

      Optional

      $RT_LOG_PATH

      The path of RT log directory.

      filter

      string

      Optional

      None

      The subset of data in the RT stream to subscribe to

    2. Use an available config_url given by its path. For more information on the config_url see this authentication section:

      key

      type

      required

      default

      description

      config_url

      json file

      Mandatory

      None

      The URL of the RT endpoint(s).

      path

      string

      Optional

      $RT_LOG_PATH

      The path of RT log directory.

      filter

      string

      Optional

      None

      The subset of data in the RT stream to subscribe to

    3. Use an absolute_path:

      key

      type

      required

      default

      description

      abs_path

      string

      Mandatory

      None

      The absolute path of the RT log directory.

The key path can be relative or absolute when replicators are started, but in the case of key abs_path the path must be absolute, and no replicators can be started. Regarding the relative or absolute path of the key path, the difference is that the absolute path is used as a path of the RT log directory, while the relative path suffixes the $RT_LOG_PATH this concatenated path is used as the RT log directory.

Examples of the available options to call .rt.sub with a dictionary:

  • Call by creation of the config json file:

    # Subscribe to the 'trade# table within the RT stream 'data'
    param:`stream`position`callback`filter!("data";`latest;callback;"trade"); 
    s:.rt.sub param
    # RT log path: `:/tmp/stream/72c0024dbc5e.data
    param:`stream`port`position`callback!("data";5001;0;callback);
    .rt.sub param
    # RT log path: `:/s/72c0024dbc5e.data
    
  • Call by application of a config json file:

    param:`config_url`path`position`callback`filter!("file:///tmp/config.json";"stream/data";`latest;callback;"quote");
    .rt.sub param
    # RT log path: `:/s/stream/data/72c0024dbc5e.data
    
  • Call by a manually provided URL:

    cluster:enlist ":rt-data-0:5001";
    param:`stream`cluster`path`position`callback`filter!("data";cluster;"stream/data";`latest;callback;"trade");
    .rt.sub param
    # RT log path: `:/s/stream/data/72c0024dbc5e.data
    
  • Call by application of an absolute path:

    param:`abs_path`position`callback!("/s/72c0024dbc5e.data";0;callback)
    .rt.sub param
    # no replicators are started
    

Other events

There are other events in the message stream that the user can be notified of with an event callback. These inform the user of exceptions where data loss can occur.

event

details

next position

implication

badtail

Message log file corruption is detected.

Skips to the next log file.

There is always data loss. No data received until the log file being written to is rolled

badmsg

A message cannot be decoded.

Skips to the next message.

The affected message could not be deserialized by the q subscriber. There will be data loss of the affected message.

reset

RT has been reset, deleting its log files and starting a new session.

Skips to the new session.

There is potential data loss and message duplication.

skip-forward

The current position points to a log file that has been archived.

Skips to the start of the first unarchived log file.

There is always data loss.

These events are replayable just like messages, for example resubscribing from an earlier position results in the same message or event sequence.

When one of these events occurs, the .rt.on_event function is called with the following arguments:

name

details

event

badtail, badmsg, reset or skip-forward.

streamid

The same streamid value used in the .rt.sub call.

position

7h vector of two positions.

The two values in the position vector depend on the event:

event

position vector values

badtail

The position of the corrupted message and the starting position of the next log file.

badmsg

The position of the corrupted message and the next message.

reset

The end position of the last message and the position of the first message after a reset.

skip-forward

The position in the archived log and the starting position of the next unarchived log file.

The default implementation of .rt.on_event only prints a message to standard out, for example:

q).rt.on_event`event`dir`position!(`reset;"streamid";130 17592186044416)
Reset event in streamid, rolling position from 130 to 17592186044416

This default implementation can be redefined, or the user can supply a custom event handler with a dictionary:

q) .rt.sub `stream`position`callback!("streamid";0;`message`event!({data;position] ...};{[event;position] ...}));

Unsubscribing

Topic Filtering

If you have passed a filter key when subscribing, you can unsubscribe by overwriting the variable that was returned

upd:{[data;pos]}
s:`.rt.sub`stream`position`callback`filter!("mystream";0;upd; "trade") // subscribe
s:() //unsubscribe

The subscriber can decide to stop receiving new messages. There are several ways of using the .rt.unsub function that are available if a filter key was no used in the .rt.sub call:

  • .rt.unsub[] can be called from a message or event callback and that stops the message stream for the subscription that the message or event was received on.

  • .rt.unsub[streamid] can be called at any point, whether inside a callback or not. This function requires the streamid as a parameter, which is the same ID provided during the subscription process using .rt.sub[streamid;pos;callback]. This function unsubscribes from the specified streamid.

  • .rt.unsub[param] can be called at any point, whether inside a callback or not. This function requires the param as a parameter, which is the same dictionary provided during the subscription process using .rt.sub[param]. This function unsubscribes from the specified param dictionary.

Pause and Resume

Topic Filtering

The pause and resume functionality is not available when a filter key is passed to .rt.sub.

Pause

The subscriber can also decide to pause the delivery of incoming new messages while the subscribers local log files continue to grow, such that the delivery of messages to the subscriber can be resumed later.

There are two ways of doing this using the .rt.pause function:

  • .rt.pause[] can be called from a message or event callback and that pauses the message stream for the subscription the message or event was received on.

  • .rt.pause[param] can be called at any point, whether inside a callback or not. This function requires the param as a parameter, which is the same dictionary provided during the subscription process using .rt.sub[param]. This function pauses the delivery from the specified param dictionary.

Resume

The subscriber can resume a paused stream using the .rt.resume function:

  • .rt.resume[param] can be called at any point, whether inside a callback or not. This function requires the param as a parameter, which is the same dictionary provided during the subscription process using .rt.sub[param]. This function resumes the delivery from the specified param dictionary.

Client Side Filtering

Topic Filtering

The topic filtering functionality provides a way to do the filtering server side and is a less resource intense way of achieving filtering. The filtering in this section is done client side.

The rt.qpk provides an efficient way to filter messages that contain generic lists with one or two leading symbols, such as (upd;trade;data).

To apply client side message filtering, you must create a special message processing callback function.

For example, to subscribe to all messages on a stream:

.rt.sub `stream`position`callback("streamid";0;callback);

To subscribe to a subset of the data on a stream, using only the trade and quote tables, use the following example:

cb:.rt.filter[{$[x=`.b;y in `trade`quote;0b]};callback];
.rt.sub `stream`position`callback("streamid";0;cb);

The filter only delivers the messages to the subscriber that satisfy the criteria.

The two types of filtering methods can be defined as:

  • Filter with caching: to enable caching, the predicate has to return boolean back.

  • Filter without caching: for disabling caching, the given predicate has to return long or int 0-1 back.

When any part of the filtering configuration changes (for example, modifications to the matching list of sym2 in the example below), you can clear the cache to ensure the updated filtering rules are applied.

t:enlist`quote;
1) enable caching - boolean predicate
f:filter[{$[x=`.b;y in t;0b]};callback];
.rt.sub `stream`position`callback("streamid";0;f);
t,:`trade;
2) clear cache
filter[f;`clear_cache];
3) disable caching - integer predicate
f:filter[{$[(x=`.b)&y in t;1;0]};callback];
.rt.sub `stream`position`callback("streamid";0;f);

Tracking

Topic Filtering

The tracking functionality is not available when a filter key is passed to .rt.sub.

The rt.qpk provides a type of callback function that only delivers the position and peek info (meta data) of each message without deserializing the whole message.

In addition to the position, the meta data is a dictionary with the following key members:

member

type

description

ts

timestamp

The timestamp when the message was recorded.

to

integer

The timeout of how long the client waits for a response.

corr

long

The correlator of the kxi message.

orig

string

The origin name, a symbol representing who originated this message.

s0

string

The first leading symbol for filtering.

s1

string

The second leading symbol for filtering.

len

integer

The length of the message.

To subscribe to tracking, define a tracker callback using the .rt.helper function where the x parameter is the meta data and y is the position:

q) tr:.rt.tracker{0N!(`msg;y;x)};
q) rt.is_tracker[tr]
1b
q) d:`stream`position`callback!("mystream";`latest;tr);
q) .rt.sub d;
q) .rt.unsub d;

Garbage collection of subscriber stream log files

RT Topic Filtering

When using topic filtering, there is no log files replicated to the client. The subcsriber receives the data from the RT server. This means that garbage collection of subscriber log files does not apply.

Adopt the RT garbage collection policy

The RT Archiver manages the garbage collection of all merged stream logs inside RT based on a configurable policy.

These archival options can be defined as part of the kdb Insights Configuration.

By default, the subscriber adopts the garbage collection policy used by RT (opt-in). When log files are truncated in RT they are also truncated on all subscribers.

Subscriber prune ahead

As a subscriber you may want to opt-out of the default behavior and independently manage the garbage collection of your local log files.

RT garbage collection API

If you choose to manage the garbage collection of your local log files you must use the API .rt.prune. Removing RT log files through other means causes issues with the subscriber ingesting the RT stream.

To opt-out of the default behaviour, set the following environment variables before calling .rt.sub.

  • $REPLICATOR_EXCHANGE_ARCHIVED=0

This allows the subscriber to prune ahead of RT's global garbage collection policy. Instead the subscriber can truncate their own local log files as messages are consumed and no longer need to be replayed (say on a restart), requiring fewer log files to be retained locally at any given time. RT log files are truncated using the following API:

.rt.prune[param;pos]

where:

  • param is the input dictionary as was passed to rt.sub[param].

  • pos is the position returned from the subscriber callback.

The .rt.prune function truncates all log files which have been rolled over, and where all the messages in the log file are before the specified position. You can find more details on truncation in the Archiver documentation. The .rt.prune function call is an asynchronous, as it starts a thread that performs the truncation task and returns.

Lead subscriber

In another use case, one of the subscribers can observe when messages have been persisted downstream, either to a storage tier or another durable messaging system. Once this happens, those messages no longer need to be replayed to that subscriber or other subscribers.

A lead subscriber is required

This requires a deliberate design decision where one of the subscribers is configured as the lead.

In this scenario, the lead subscriber is responsible for driving the truncation of merged logs both in RT and the other subscribers.

The RT Archiver is still required

This would work alongside the RT Archiver which is still required to ensure RT itself doesn't run out of disk space if the lead subscriber fails. However, an early truncation decision by the lead subscriber takes precedence over the RT Archiver policy.

To do this, the lead subscriber must be set to the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=1 before calling .rt.sub. It can then use the .rt.prune function, as described above, and its truncation decisions are then reflected in RT and the other subscribers (assuming they haven't opted-out).

Logging facilities

You can enable the logging feature of the rt.qpk by setting the environment variable RT_LOGLEVEL_CONSOLE to DEBUG or TRACE. With this setting, DEBUG level RT logs are generated when .rt.pub, .rt.sub, or .rt.get_latest_position are called, and a one liner position log is created for .rt.sub with latest position.

All logs contain the rt-q-client component name within the header info. The log format is in json format by default, but you can change it to txt format by setting the RT_LOGFORMAT_CONSOLE environment variable to txt.

You can also obtain log messages sent by the rt_helper application (an external binary library for supporting all the replicator related tasks) from the kxi_c_sdk_logs folder at the end of the path you set as rt_helper_log_path member in the .rt.pub/.rt.sub input parameter dictionary. This path also appears in the RT logs.

 param:`stream`position`callback`rt_helper_log_path!("data";`latest;callback;"/s/mystream/logs");
 .rt.sub param
 # rt_helper logs: /s/mystream/logs/kxi_c_sdk_logs