Configure the KX Flow Accelerator

This page demonstrates the configuration in the KX Flow Accelerator.

The KX Flow Accelerator comes pre-packaged with pipelines which use a flowbridge to ingest data from realtime feeds.

To ensure that you can follow this demonstration, refer to the quickstart guide.

We update the pipeline specification to modify our mapping to show how to change one ingested column name to conform with the underlying FSI table definition. Here we focus on the mapCols table which is ensuring that any ingested column names are renamed appropriately to match that of our schema.

In this case, we use the fxflowmarketdata pipeline to demonstrate how a user can modify the pipeline to convert column names. This pipeline specification file can be found in the /src directory of the fsi-app-ice-fi package.

Before our changes, the pipeline specification is similar to the below:

q

Copy
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["fxTransformFuncs.q";] each fsiApps;

//Configuration - filepath and region configurable
.fsi.filePath:`$":s3://<YOUR FILEPATH>/<YOUR FILENAME>";
.fsi.region:"us-east-2";

additionalCols:(!) . flip (
    (`instrumentType;enlist `);
    (`assetClass;enlist `FX);
    (`product;enlist `SPOT);
    (`srcSys;enlist `)
    );

mapCols:(!) . flip (
    (`eventTimestamp;`eventTimestamp);
    (`instrumentID;`instrumentID);
    (`ccyPair;`ccyPair);
    (`askPrice;`askPrice);
    (`bidPrice;`bidPrice);
    (`baseCcy;`baseCcy)
    )

SCHEMA: flip $[;()] each (!) . flip (
   (`date;         "d");
   (`sym;          "s");
   (`time;         "p");
   (`srcTime;      "p");
   (`recvTime;     "p");
   (`src;          "s");
   (`tenor;        "s");
   (`asset;        "s");
   (`deliverable;  "b");
   (`level;        "s");
   (`ccy;          "s");
   (`valueDate;    "d");
   (`ask;          "f");
   (`bid;          "f");
   (`askPoints;    "f");
   (`bidPoints;    "f");
   (`askQuoteID;   "");
   (`bidQuoteID;   "");
   (`askSize;      "f");
   (`bidSize;      "f");
   (`askCond;      "c");
   (`bidCond;      "c");
   (`askMinSize;   "f");
   (`bidMinSize;   "f");
   (`askExpiryTime;"p");
   (`bidExpiryTime;"p")
   )

.qsp.run
  .qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`binary;.fsi.region)]
  .qsp.decode.csv[SCHEMA;","]
  .qsp.map[{update pubTime:time, askQuoteID:(raze each askQuoteID), bidQuoteID:(raze each bidQuoteID) from x}]
  .qsp.map[.fsi.enrichFlowData[additionalCols;]]
  .qsp.map[.fsi.mapFlowDataToFsiSchema[mapCols;]]
  .qsp.transform.schema[`FXQuote]
  .qsp.write.toDatabase[`FXQuote;`$.spenv.assembly[];.qsp.use `directWrite`mountName!(1b;`hdb)]

Following our changes, the file looks like this:

Note how we have changed the mapCols table to reflect the column names of the underlaying Quote table.

q

Copy
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/fsi.utils.q"];
fsiApps:.util.getFsiAppsByMaxVersion[];
.util.loadFileFromApp["fxTransformFuncs.q";] each fsiApps;

//Configuration - filepath and region configurable
.fsi.filePath:`$":s3://<YOUR FILEPATH>/<YOUR FILENAME>";
.fsi.region:"us-east-2";

additionalCols:(!) . flip (
    (`instrumentType;enlist `);
    (`assetClass;enlist `FX);
    (`product;enlist `SPOT);
    (`srcSys;enlist `)
    );

mapCols:(!) . flip (
    (`time;`eventTimestamp);
    (`sym;`instrumentID);
    (`asset;`ccyPair);
    (`ask;`askPrice);
    (`bid;`bidPrice);
    (`ccy;`baseCcy)
    )

SCHEMA: flip $[;()] each (!) . flip (
   (`date;         "d");
   (`sym;          "s");
   (`time;         "p");
   (`srcTime;      "p");
   (`recvTime;     "p");
   (`src;          "s");
   (`tenor;        "s");
   (`asset;        "s");
   (`deliverable;  "b");
   (`level;        "s");
   (`ccy;          "s");
   (`valueDate;    "d");
   (`ask;          "f");
   (`bid;          "f");
   (`askPoints;    "f");
   (`bidPoints;    "f");
   (`askQuoteID;   "");
   (`bidQuoteID;   "");
   (`askSize;      "f");
   (`bidSize;      "f");
   (`askCond;      "c");
   (`bidCond;      "c");
   (`askMinSize;   "f");
   (`bidMinSize;   "f");
   (`askExpiryTime;"p");
   (`bidExpiryTime;"p")
   )

.qsp.run
  .qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`binary;.fsi.region)]
  .qsp.decode.csv[SCHEMA;","]
  .qsp.map[{update pubTime:time, askQuoteID:(raze each askQuoteID), bidQuoteID:(raze each bidQuoteID) from x}]
  .qsp.map[.fsi.enrichFlowData[additionalCols;]]
  .qsp.map[.fsi.mapFlowDataToFsiSchema[mapCols;]]
  .qsp.transform.schema[`FXQuote]
  .qsp.write.toDatabase[`FXQuote;`$.spenv.assembly[];.qsp.use `directWrite`mountName!(1b;`hdb)]

Importantly there are distinct differences with this walkthrough when compared to that of the FSI ICE Orderbook and FSI Fixed Income Screener. In the later we needed to modify the feed handler to ingest additional tokens, however, this is not the case with this accelerator.

Commands to unpack and re-package a package can be found under package create.

Commands to push a package can be found in the packaging documentation.

Changing configuration through a custom package

Alternatively, in many cases configuration can be adjusted by including variables in a custom package. Where this is possible, it is highlighted in the applicable section of documentation. Instructions to create and load a custom package can be found in the packaging documentation.