Configure the ICE Order Book Accelerator

This page explains the configuration in the Order Book Accelerator.

The ICE Orderbook comes pre-packed with a feed values YAML file which instructs the ICE feed handler which tokens and data to subscribe to. Should you need to ingest different or new data, modify the ice-feed-values.yaml file with any required data.

To demonstrate the modification of the configuration of the orderbook ingest, we can modify the ice-feed-values.yaml to ensure that the correct feeds are being drawn.

A typical feed-values.yaml includes:

YAML

Copy
# Default values for rt-ice-pub.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
autoscaling:
  enabled: false
# TODO: Add you image pull secret here.
imagePullSecrets:
- name: docker-pull
serviceAccount:
  # Specifies whether a service account should be created
  create: true
  # Annotations to add to the service account
  annotations: {}
  # The name of the service account to use.
  # If not set and create is true, a name is generated using the fullname template
  name: ""
  #
  # Specifies whether to auto-mount a service account
  #
  autoMount: true
  #
# @param resourceAnnotations - Annotations applied to the top level resource StatefulSet/Deployment
#
resourceAnnotations: {}
## @section persistence Configure PVC
## @param enabled Enable or disable persistent storage, claimed via PVCs
## @param useLocalValues Override Global accessMode and storageClass
## @param storageClass Storage Class to apply to PVC
## Unset uses the cluster default storage class.
## @param storageSize Volume requested size
## @param accessModes List of desired access modes for PVC
persistence:
  enabled: true
  useLocalValues: false
  storageClass: ""
  storageSize: "10G"
  accessModes:
    - ReadWriteOnce
# TODO: Ensure stream.name matches your desired RT-North
stream:
  sinkName: rt-fsi-app-ice-orderbook-icerealtime-north
rt:
  logLevel: INFO
  logPath: /tmprt
  volCapacity: 10G
ice:
  loggingFile: "/var/log/kxfeed_ice.log"
  apiLoggingFile: "/var/log/ice_api.log"
  multithreaded: false
  # TODO: Insert Connection details as provided by ICE
  primaryConnection: "<IP_Address_Provided_By_ICE_1>:<Port_Num_Provided_By_ICE_1>"
  backupConnection: "<IP_Address_Provided_By_ICE_2>:<Port_Num_Provided_By_ICE_2>"
  # Example Subscription List: L1 and L2 Futures with Wildcard + IBM & IBN Equities.
  # TODO: Update with your subscription list
  subscriptionList: "SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\Z24; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\H25; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\M25; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\U25; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:700; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:300; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:9988; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:3690; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:LLOY; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:VOD; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:IAG; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:BARC;"

  logMsg: false
  token2typeLookup: "/tmp/kxfeed_ice/token2Type.json"
  # Trade and Quote Tokens used by the FSI schemas / ICE overlay
  tradeTableTokens: "6,8,9,16,22,23,28,55,111,447,448,460,461,2500,4024"
  quoteTableTokens: "6,10,11,12,13,16,55,58,59,60,61,308,439,478,479,1165,1166"
  # Token which decides if a payload is the trade message
  tradeMessageIdentifiers: "8"
  enableFiltertokens: true
  refreshTable: true
  restServer: false
  sourceTimeZone: "628:EST5EDT,M3.2.0,M11.1.0;270:EST5EDT,M3.2.0,M11.1.0;886:EST5EDT,M3.2.0,M11.1.0;558:EST5EDT,M3.2.0,M11.1.0;564:CST6CDT,M3.2.0,M11.1.0;1330:EST5EDT,M3.2.0,M11.1.0;1331:EST5EDT,M3.2.0,M11.1.0;1327:EST5EDT,M3.2.0,M11.1.0;1328:EST5EDT,M3.2.0,M11.1.0"
  dbSchemaFile: "/tmp/kxfeed_ice/schema.xml"
  dbConfigFile: "/tmp/kxfeed_ice/kxfeed_config.json"
  secrets:
    name: my-ice-secrets
image:
  repository: portal.dl.kx.com
  component: kxfeed_ice
  tag: 1.0.0
  pullPolicy: IfNotPresent
podSecurityContext:
  # 'nobody' user
  fsGroup: 65534
  runAsUser: 65534
  runAsNonRoot: true
securityContext:
  readOnlyRootFilesystem: false
  # runAsNonRoot: true
  # 'nobody' user
  # runAsUser: 65534
  allowPrivilegeEscalation: false
arguments: ["/usr/local/bin/updRTConfig.sh"]
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
#   cpu: 100m
#   memory: 128Mi
# requests:
#   cpu: 100m
#   memory: 128Mi
encryption:
# NOTE: Set enabled false if running on an Insights environment that does NOT have encryption enabled.
enabled: true

This file determines what tokens are ingested by the feed through the pipelines into the databases. Additionally, the file also determines persistence, resource, and logging parameters.

As can be seen we are ingesting token number 16 in the tradeTableTokens field. This token represents the activityTime as recorded by ICE.

Now that we have instructed the Feed handler to ingest the additional token, we modify our schema to ensure that the there is a column to accept the new data within the table.

To do this we use an overlay to add a field to the existing schema. To use an overlay refer to the Overlays & Patches section.

The IceRealTime.yaml contains data similar to the following:

yaml

Copy
kind: Package
apiVersion: pakx/v1
metadata:
name: target
spec:
pipelines:
- name: icerealtimel1
spec: src/icerealtimel1-pipeline-spec.q
source: icerealtime-north
- name: icerealtimel2
spec: src/icerealtimel2-pipeline-spec.q
source: icerealtime-north
tables:
schemas:
- name: Trade
columns:
- name: srcID
type: int
- name: permissions
type: int
- name: entitlementCode
type: long
- name: cumulativeTradeVol
type: long
- name: cumulativeTradeSessionVol
type: long
- name: exchTime
type: timestamp
- name: tradeCondPrice
type: float
- name: tradeCondSize
type: long
- name: tradeTotalValue
type: long
- name: tradeID
type: long
- name: tradeCond_1
type: long
- name: exchangeID
type: symbol
databases:
- name: fsi-core-db
shards:
- name: fsi-core-db-shard
sequencers:
icerealtime-north:
external: true
k8sPolicy:
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 256Mi
tmpDirSize: 5Mi
serviceAccountConfigure:
create: true
maxDiskUsagePercent: 90
size: 3
topicConfig:
subTopic: icerealtime-north-ext
topicPrefix: rt-
topicConfigDir: /config/topics/
volume:
mountPath: /s/
size: 20Gi
subPaths:
cp: state
in: in
out: out                   

In this file, we add an additional column to the yaml definition of the the Trade table. This allows us to ingest the new data and store it within the table.

The updated yaml is now:

YAML

Copy
kind: Package
apiVersion: pakx/v1
metadata:
  name: target
spec:
  pipelines:
  - name: icerealtimel1
    spec: src/icerealtimel1-pipeline-spec.q
    source: icerealtime-north
  - name: icerealtimel2
    spec: src/icerealtimel2-pipeline-spec.q
    source: icerealtime-north
  tables:
    schemas:
    - name: Trade
      columns:
      - name: srcID
        type: int
      - name: permissions
        type: int
      - name: entitlementCode
        type: long
      - name: cumulativeTradeVol
        type: long
      - name: cumulativeTradeSessionVol
        type: long
      - name: activityTime
      - type: timestamp
      - name: exchTime
        type: timestamp
      - name: tradeCondPrice
        type: float
      - name: tradeCondSize
        type: long
      - name: tradeTotalValue
        type: long
      - name: tradeID
        type: long
      - name: tradeCond_1
        type: long
      - name: exchangeID
        type: symbol
  databases:
  - name: fsi-core-db
    shards:
    - name: fsi-core-db-shard
      sequencers:
        icerealtime-north:
          external: true
          k8sPolicy:
            resources:
              limits:
                cpu: 500m
                memory: 512Mi
              requests:
                cpu: 100m
                memory: 256Mi
              tmpDirSize: 5Mi
            serviceAccountConfigure:
              create: true
          maxDiskUsagePercent: 90
          size: 3
          topicConfig:
            subTopic: icerealtime-north-ext
            topicPrefix: rt-
          topicConfigDir: /config/topics/
          volume:
            mountPath: /s/
            size: 20Gi
            subPaths:
              cp: state
              in: in
              out: out

Finally, we update the pipeline specification to ensure that the pipeline is correctly mapping our new ingested activityTime data to the exchTime timestamp.

Before our change, the pipeline specification is similar to the below:

q

Copy
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
.kxi.packages.file.load["src/dl.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["iceRealtime.l1PipelineFuncs.q";]each fsiApps;


//######################### CONFIGURATION #######################
// Assembly name
.fsi.assemblyName:.spenv.assembly[];
.fsi.rtNorth:.fsi.assemblyName,"-icerealtime-north";

// Note: Set .ice.debug:1b if your wish to enable debug vars
.ice.debug:0b;

// get Trade Schema from assembly
TradeSch:.qsp.getSchema[`Trade];
typeList[where 10=typeList:exec datatype from TradeSch]:0h;
.ice.fsiSchema.Trade:flip (exec name from TradeSch)!typeList$\:();

// Map ICE cols to the FSI schema
// Trade
.ice.iceToFsiColMap.Trade:(!) . flip (
    (`srcID                     ; `srcID);
    (`permissions               ; `permissions);
    (`entitlementCode           ; `entitlementCode);
    (`cumulativeTradeVol        ; `cumulativeTradeVol);
    (`cumulativeTradeSessionVol ; `cumulativeTradeSessionVol);
    (`activityTime              ; `activityTime)
    (`exchTime                  ; `exchTime);
    (`tradeCondPrice            ; `tradeCondPrice);
    (`tradeCondSize             ; `tradeCondSize);
    (`tradeTotalValue           ; `tradeTotalValue);
    (`tradeID                   ; `tradeID);
    (`tradeCond_1               ; `tradeCond_1);
    (`tradedExchange            ; `exchangeID)
    );

// All columns as they come from ICE FH
// We are taking the `msgType` from ICE FH but is not streamed yet
// The default eventTimestamp is `receivedTime` from FH
.ice.cols.Trade:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`price`volume`activityTime`cumulativeTradeVol`cumulativeTradeSessionVol`sequenceNumber`exchTime`conditions`tradeCondPrice`tradeCondSize`tradeTotalValue`tradeID`tradeCond_1`tradedExchange;
.ice.cols.Quote:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`askPrice`askSize`bidPrice`bidSize`activityTime`exchTime`bidDepthPrice`bidDepthSize`askDepthPrice`askDepthSize`orderID`tobMarker`bidPartCode`askPartCode`bidDepthCond`askDepthCond;
//######################### CONFIGURATION #######################

.ice.upd:{[t;d]
    d:.ice.fsiSchema[t] upsert ?[enlist .ice.cols[t]!d;();0b;.ice.iceToFsiColMap[t]];
    d
    };

.ice.updTrade:.ice.upd[`Trade];

source: .qsp.read.fromStream[]

tradeStream: source
     .qsp.filter[{[md;data]`trade~md`table};.qsp.use``params!(::;`metadata`data)]
     .qsp.map[{if[.ice.debug;.debug.trade:x];x}]
     .qsp.map[.ice.updTrade]
     .qsp.write.toDatabase[`Trade; .fsi.assemblyName]

quoteStream: source
    .qsp.filter[{[md;data]`quote~md`table};.qsp.use``params!(::;`metadata`data)]
    .qsp.map[{if[.ice.debug;.debug.quote:x];x}]
    .qsp.map[.ice.updQuote]
    .qsp.write.toDatabase[`Quote; .fsi.assemblyName]

.qsp.run(quoteStream; tradeStream)

Following our change, the file looks like this:

Note how we have changed the .ice.icetoFsiColMap table to overwrite `exchTime.

q

Copy
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
.kxi.packages.file.load["src/dl.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["iceRealtime.l1PipelineFuncs.q";]each fsiApps;


//######################### CONFIGURATION #######################
// Assembly name
.fsi.assemblyName:.spenv.assembly[];
.fsi.rtNorth:.fsi.assemblyName,"-icerealtime-north";

// Note: Set .ice.debug:1b if your wish to enable debug vars
.ice.debug:0b;

// get Trade Schema from assembly
TradeSch:.qsp.getSchema[`Trade];
typeList[where 10=typeList:exec datatype from TradeSch]:0h;
.ice.fsiSchema.Trade:flip (exec name from TradeSch)!typeList$\:();

// Map ICE cols to the FSI schema
// Trade
.ice.iceToFsiColMap.Trade:(!) . flip (
    (`srcID                     ; `srcID);
    (`permissions               ; `permissions);
    (`entitlementCode           ; `entitlementCode);
    (`cumulativeTradeVol        ; `cumulativeTradeVol);
    (`cumulativeTradeSessionVol ; `cumulativeTradeSessionVol);
    (`activityTime              ; `activityTime)
    (`activityTime              ; `exchTime);
    (`tradeCondPrice            ; `tradeCondPrice);
    (`tradeCondSize             ; `tradeCondSize);
    (`tradeTotalValue           ; `tradeTotalValue);
    (`tradeID                   ; `tradeID);
    (`tradeCond_1               ; `tradeCond_1);
    (`tradedExchange            ; `exchangeID)
    );

// All columns as they come from ICE FH
// We are taking the `msgType` from ICE FH but is not streamed yet
// The default eventTimestamp is `receivedTime` from FH
.ice.cols.Trade:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`price`volume`activityTime`cumulativeTradeVol`cumulativeTradeSessionVol`sequenceNumber`exchTime`conditions`tradeCondPrice`tradeCondSize`tradeTotalValue`tradeID`tradeCond_1`tradedExchange;
.ice.cols.Quote:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`askPrice`askSize`bidPrice`bidSize`activityTime`exchTime`bidDepthPrice`bidDepthSize`askDepthPrice`askDepthSize`orderID`tobMarker`bidPartCode`askPartCode`bidDepthCond`askDepthCond;
//######################### CONFIGURATION #######################

.ice.upd:{[t;d]
    d:.ice.fsiSchema[t] upsert ?[enlist .ice.cols[t]!d;();0b;.ice.iceToFsiColMap[t]];
    d
    };

.ice.updTrade:.ice.upd[`Trade];

source: .qsp.read.fromStream[]

tradeStream: source
     .qsp.filter[{[md;data]`trade~md`table};.qsp.use``params!(::;`metadata`data)]
     .qsp.map[{if[.ice.debug;.debug.trade:x];x}]
     .qsp.map[.ice.updTrade]
     .qsp.write.toDatabase[`Trade; .fsi.assemblyName]

quoteStream: source
    .qsp.filter[{[md;data]`quote~md`table};.qsp.use``params!(::;`metadata`data)]
    .qsp.map[{if[.ice.debug;.debug.quote:x];x}]
    .qsp.map[.ice.updQuote]
    .qsp.write.toDatabase[`Quote; .fsi.assemblyName]

.qsp.run(quoteStream; tradeStream)

The steps would be the same for the other packages in the FSI Accelerator.

Commands to unpack and re-pack a package can be found under package packing.

Commands to push a package can be found in the packaging documentation.

Changing configuration through a custom package

Alternatively, in many cases configuration can be adjusted by including variables in a custom package. Where this is possible, it is highlighted in the applicable section of documentation. Instructions to create and load a custom package can be found in the packaging documentation.