Configure the ICE Order Book Accelerator
This page explains the configuration in the Order Book Accelerator.
The ICE Orderbook comes pre-packed with a feed values YAML file which instructs the ICE feed handler which tokens and data to subscribe to. Should you need to ingest different or new data, modify the ice-feed-values.yaml file with any required data.
To demonstrate the modification of the configuration of the orderbook ingest, we can modify the ice-feed-values.yaml
to ensure that the correct feeds are being drawn.
A typical feed-values.yaml
includes:
YAML
# Default values for rt-ice-pub.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
autoscaling:
enabled: false
# TODO: Add you image pull secret here.
imagePullSecrets:
- name: docker-pull
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
#
# Specifies whether to auto-mount a service account
#
autoMount: true
#
# @param resourceAnnotations - Annotations applied to the top level resource StatefulSet/Deployment
#
resourceAnnotations: {}
## @section persistence Configure PVC
## @param enabled Enable or disable persistent storage, claimed via PVCs
## @param useLocalValues Override Global accessMode and storageClass
## @param storageClass Storage Class to apply to PVC
## Unset uses the cluster default storage class.
## @param storageSize Volume requested size
## @param accessModes List of desired access modes for PVC
persistence:
enabled: true
useLocalValues: false
storageClass: ""
storageSize: "10G"
accessModes:
- ReadWriteOnce
# TODO: Ensure stream.name matches your desired RT-North
stream:
sinkName: rt-fsi-app-ice-orderbook-icerealtime-north
rt:
logLevel: INFO
logPath: /tmprt
volCapacity: 10G
ice:
loggingFile: "/var/log/kxfeed_ice.log"
apiLoggingFile: "/var/log/ice_api.log"
multithreaded: false
# TODO: Insert Connection details as provided by ICE
primaryConnection: "<IP_Address_Provided_By_ICE_1>:<Port_Num_Provided_By_ICE_1>"
backupConnection: "<IP_Address_Provided_By_ICE_2>:<Port_Num_Provided_By_ICE_2>"
# Example Subscription List: L1 and L2 Futures with Wildcard + IBM & IBN Equities.
# TODO: Update with your subscription list
subscriptionList: "SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\Z24; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\H25; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\M25; SUBSCRIBE,ENUM_SRC_ID:268,SYMBOL_TICKER:F:DX\\U25; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:700; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:300; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:9988; SUBSCRIBE,ENUM_SRC_ID:938,SYMBOL_TICKER:E:3690; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:LLOY; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:VOD; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:IAG; SUBSCRIBE,ENUM_SRC_ID:628,SYMBOL_TICKER:E:BARC;"
logMsg: false
token2typeLookup: "/tmp/kxfeed_ice/token2Type.json"
# Trade and Quote Tokens used by the FSI schemas / ICE overlay
tradeTableTokens: "6,8,9,16,22,23,28,55,111,447,448,460,461,2500,4024"
quoteTableTokens: "6,10,11,12,13,16,55,58,59,60,61,308,439,478,479,1165,1166"
# Token which decides if a payload is the trade message
tradeMessageIdentifiers: "8"
enableFiltertokens: true
refreshTable: true
restServer: false
sourceTimeZone: "628:EST5EDT,M3.2.0,M11.1.0;270:EST5EDT,M3.2.0,M11.1.0;886:EST5EDT,M3.2.0,M11.1.0;558:EST5EDT,M3.2.0,M11.1.0;564:CST6CDT,M3.2.0,M11.1.0;1330:EST5EDT,M3.2.0,M11.1.0;1331:EST5EDT,M3.2.0,M11.1.0;1327:EST5EDT,M3.2.0,M11.1.0;1328:EST5EDT,M3.2.0,M11.1.0"
dbSchemaFile: "/tmp/kxfeed_ice/schema.xml"
dbConfigFile: "/tmp/kxfeed_ice/kxfeed_config.json"
secrets:
name: my-ice-secrets
image:
repository: portal.dl.kx.com
component: kxfeed_ice
tag: 1.0.0
pullPolicy: IfNotPresent
podSecurityContext:
# 'nobody' user
fsGroup: 65534
runAsUser: 65534
runAsNonRoot: true
securityContext:
readOnlyRootFilesystem: false
# runAsNonRoot: true
# 'nobody' user
# runAsUser: 65534
allowPrivilegeEscalation: false
arguments: ["/usr/local/bin/updRTConfig.sh"]
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
encryption:
# NOTE: Set enabled false if running on an Insights environment that does NOT have encryption enabled.
enabled: true
This file determines what tokens are ingested by the feed through the pipelines into the databases. Additionally, the file also determines persistence, resource, and logging parameters.
As can be seen we are ingesting token number 16 in the tradeTableTokens field. This token represents the activityTime
as recorded by ICE.
Now that we have instructed the Feed handler to ingest the additional token, we modify our schema to ensure that the there is a column to accept the new data within the table.
To do this we use an overlay to add a field to the existing schema. To use an overlay refer to the Overlays & Patches section.
The IceRealTime.yaml
contains data similar to the following:
yaml
kind: Package
apiVersion: pakx/v1
metadata:
name: target
spec:
pipelines:
- name: icerealtimel1
spec: src/icerealtimel1-pipeline-spec.q
source: icerealtime-north
- name: icerealtimel2
spec: src/icerealtimel2-pipeline-spec.q
source: icerealtime-north
tables:
schemas:
- name: Trade
columns:
- name: srcID
type: int
- name: permissions
type: int
- name: entitlementCode
type: long
- name: cumulativeTradeVol
type: long
- name: cumulativeTradeSessionVol
type: long
- name: exchTime
type: timestamp
- name: tradeCondPrice
type: float
- name: tradeCondSize
type: long
- name: tradeTotalValue
type: long
- name: tradeID
type: long
- name: tradeCond_1
type: long
- name: exchangeID
type: symbol
databases:
- name: fsi-core-db
shards:
- name: fsi-core-db-shard
sequencers:
icerealtime-north:
external: true
k8sPolicy:
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 256Mi
tmpDirSize: 5Mi
serviceAccountConfigure:
create: true
maxDiskUsagePercent: 90
size: 3
topicConfig:
subTopic: icerealtime-north-ext
topicPrefix: rt-
topicConfigDir: /config/topics/
volume:
mountPath: /s/
size: 20Gi
subPaths:
cp: state
in: in
out: out
In this file, we add an additional column to the yaml definition of the the Trade table. This allows us to ingest the new data and store it within the table.
The updated yaml is now:
YAML
kind: Package
apiVersion: pakx/v1
metadata:
name: target
spec:
pipelines:
- name: icerealtimel1
spec: src/icerealtimel1-pipeline-spec.q
source: icerealtime-north
- name: icerealtimel2
spec: src/icerealtimel2-pipeline-spec.q
source: icerealtime-north
tables:
schemas:
- name: Trade
columns:
- name: srcID
type: int
- name: permissions
type: int
- name: entitlementCode
type: long
- name: cumulativeTradeVol
type: long
- name: cumulativeTradeSessionVol
type: long
- name: activityTime
- type: timestamp
- name: exchTime
type: timestamp
- name: tradeCondPrice
type: float
- name: tradeCondSize
type: long
- name: tradeTotalValue
type: long
- name: tradeID
type: long
- name: tradeCond_1
type: long
- name: exchangeID
type: symbol
databases:
- name: fsi-core-db
shards:
- name: fsi-core-db-shard
sequencers:
icerealtime-north:
external: true
k8sPolicy:
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 256Mi
tmpDirSize: 5Mi
serviceAccountConfigure:
create: true
maxDiskUsagePercent: 90
size: 3
topicConfig:
subTopic: icerealtime-north-ext
topicPrefix: rt-
topicConfigDir: /config/topics/
volume:
mountPath: /s/
size: 20Gi
subPaths:
cp: state
in: in
out: out
Finally, we update the pipeline specification to ensure that the pipeline is correctly mapping our new ingested activityTime data to the exchTime timestamp.
Before our change, the pipeline specification is similar to the below:
q
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
.kxi.packages.file.load["src/dl.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["iceRealtime.l1PipelineFuncs.q";]each fsiApps;
//######################### CONFIGURATION #######################
// Assembly name
.fsi.assemblyName:.spenv.assembly[];
.fsi.rtNorth:.fsi.assemblyName,"-icerealtime-north";
// Note: Set .ice.debug:1b if your wish to enable debug vars
.ice.debug:0b;
// get Trade Schema from assembly
TradeSch:.qsp.getSchema[`Trade];
typeList[where 10=typeList:exec datatype from TradeSch]:0h;
.ice.fsiSchema.Trade:flip (exec name from TradeSch)!typeList$\:();
// Map ICE cols to the FSI schema
// Trade
.ice.iceToFsiColMap.Trade:(!) . flip (
(`srcID ; `srcID);
(`permissions ; `permissions);
(`entitlementCode ; `entitlementCode);
(`cumulativeTradeVol ; `cumulativeTradeVol);
(`cumulativeTradeSessionVol ; `cumulativeTradeSessionVol);
(`activityTime ; `activityTime)
(`exchTime ; `exchTime);
(`tradeCondPrice ; `tradeCondPrice);
(`tradeCondSize ; `tradeCondSize);
(`tradeTotalValue ; `tradeTotalValue);
(`tradeID ; `tradeID);
(`tradeCond_1 ; `tradeCond_1);
(`tradedExchange ; `exchangeID)
);
// All columns as they come from ICE FH
// We are taking the `msgType` from ICE FH but is not streamed yet
// The default eventTimestamp is `receivedTime` from FH
.ice.cols.Trade:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`price`volume`activityTime`cumulativeTradeVol`cumulativeTradeSessionVol`sequenceNumber`exchTime`conditions`tradeCondPrice`tradeCondSize`tradeTotalValue`tradeID`tradeCond_1`tradedExchange;
.ice.cols.Quote:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`askPrice`askSize`bidPrice`bidSize`activityTime`exchTime`bidDepthPrice`bidDepthSize`askDepthPrice`askDepthSize`orderID`tobMarker`bidPartCode`askPartCode`bidDepthCond`askDepthCond;
//######################### CONFIGURATION #######################
.ice.upd:{[t;d]
d:.ice.fsiSchema[t] upsert ?[enlist .ice.cols[t]!d;();0b;.ice.iceToFsiColMap[t]];
d
};
.ice.updTrade:.ice.upd[`Trade];
source: .qsp.read.fromStream[]
tradeStream: source
.qsp.filter[{[md;data]`trade~md`table};.qsp.use``params!(::;`metadata`data)]
.qsp.map[{if[.ice.debug;.debug.trade:x];x}]
.qsp.map[.ice.updTrade]
.qsp.write.toDatabase[`Trade; .fsi.assemblyName]
quoteStream: source
.qsp.filter[{[md;data]`quote~md`table};.qsp.use``params!(::;`metadata`data)]
.qsp.map[{if[.ice.debug;.debug.quote:x];x}]
.qsp.map[.ice.updQuote]
.qsp.write.toDatabase[`Quote; .fsi.assemblyName]
.qsp.run(quoteStream; tradeStream)
Following our change, the file looks like this:
Note how we have changed the .ice.icetoFsiColMap
table to overwrite `exchTime.
q
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
.kxi.packages.file.load["src/dl.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["iceRealtime.l1PipelineFuncs.q";]each fsiApps;
//######################### CONFIGURATION #######################
// Assembly name
.fsi.assemblyName:.spenv.assembly[];
.fsi.rtNorth:.fsi.assemblyName,"-icerealtime-north";
// Note: Set .ice.debug:1b if your wish to enable debug vars
.ice.debug:0b;
// get Trade Schema from assembly
TradeSch:.qsp.getSchema[`Trade];
typeList[where 10=typeList:exec datatype from TradeSch]:0h;
.ice.fsiSchema.Trade:flip (exec name from TradeSch)!typeList$\:();
// Map ICE cols to the FSI schema
// Trade
.ice.iceToFsiColMap.Trade:(!) . flip (
(`srcID ; `srcID);
(`permissions ; `permissions);
(`entitlementCode ; `entitlementCode);
(`cumulativeTradeVol ; `cumulativeTradeVol);
(`cumulativeTradeSessionVol ; `cumulativeTradeSessionVol);
(`activityTime ; `activityTime)
(`activityTime ; `exchTime);
(`tradeCondPrice ; `tradeCondPrice);
(`tradeCondSize ; `tradeCondSize);
(`tradeTotalValue ; `tradeTotalValue);
(`tradeID ; `tradeID);
(`tradeCond_1 ; `tradeCond_1);
(`tradedExchange ; `exchangeID)
);
// All columns as they come from ICE FH
// We are taking the `msgType` from ICE FH but is not streamed yet
// The default eventTimestamp is `receivedTime` from FH
.ice.cols.Trade:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`price`volume`activityTime`cumulativeTradeVol`cumulativeTradeSessionVol`sequenceNumber`exchTime`conditions`tradeCondPrice`tradeCondSize`tradeTotalValue`tradeID`tradeCond_1`tradedExchange;
.ice.cols.Quote:`msgType`instrumentID`eventTimestamp`srcID`permissions`entitlementCode`askPrice`askSize`bidPrice`bidSize`activityTime`exchTime`bidDepthPrice`bidDepthSize`askDepthPrice`askDepthSize`orderID`tobMarker`bidPartCode`askPartCode`bidDepthCond`askDepthCond;
//######################### CONFIGURATION #######################
.ice.upd:{[t;d]
d:.ice.fsiSchema[t] upsert ?[enlist .ice.cols[t]!d;();0b;.ice.iceToFsiColMap[t]];
d
};
.ice.updTrade:.ice.upd[`Trade];
source: .qsp.read.fromStream[]
tradeStream: source
.qsp.filter[{[md;data]`trade~md`table};.qsp.use``params!(::;`metadata`data)]
.qsp.map[{if[.ice.debug;.debug.trade:x];x}]
.qsp.map[.ice.updTrade]
.qsp.write.toDatabase[`Trade; .fsi.assemblyName]
quoteStream: source
.qsp.filter[{[md;data]`quote~md`table};.qsp.use``params!(::;`metadata`data)]
.qsp.map[{if[.ice.debug;.debug.quote:x];x}]
.qsp.map[.ice.updQuote]
.qsp.write.toDatabase[`Quote; .fsi.assemblyName]
.qsp.run(quoteStream; tradeStream)
The steps would be the same for the other packages in the FSI Accelerator.
Commands to unpack and re-pack a package can be found under package packing.
Commands to push a package can be found in the packaging documentation.
Changing configuration through a custom package
Alternatively, in many cases configuration can be adjusted by including variables in a custom package. Where this is possible, it is highlighted in the applicable section of documentation. Instructions to create and load a custom package can be found in the packaging documentation.