Pipeline Administration

This page describes the steps to customize existing pipelines, add new pipelines, or remove pipelines that are no longer needed.

In this example, we address a fsi-app-kx-flow accelerator package.

Adding a New Pipeline

You can see how to add a new pipeline to fsi-app-kx-flow in Ingesting Data.

Customizing an Existing Pipeline

Below outlines the steps to customize a pipeline in the fsi-app-kx-flow package. The customized pipeline in this example is fxmarketdata.

Unpack fsi-app-kx-flow

Unpack the fsi-app-kx-flow package

Shell

Copy
> kxi package unpack fsi-app-kx-flow-1.0.0.kxi
> ls
fsi-app-kx-flow  fsi-app-kx-flow-1.0.0.kxi

Customize the Quote Schema

Customize the Quote schema within the tables directory to add in the following field:

Copy
Qualifier   | string

See the steps for how to customize the schema in Overlaying a Schema with Custom Columns.

Add New Columns to the Pipeline Spec

Add new columns in the fxmarketdata pipeline spec fxmarketdata-pipeline-spec.q by updating it's column mapping and schema definition. For more details on configuring pipeline specs, refer to Pipeline Configuration.

Shell

Copy
> cd fsi-app-kx-flow
> ls
databases  deployment_config  feed  manifest.yaml  patches  pipelines  router  src
>
>
> #BEFORE
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==12, NR==19'
mapCols:(!) . flip (
    (`time;`eventTimestamp);
    (`sym;`instrumentID);
    (`asset;`ccyPair);
    (`ask;`askPrice);
    (`bid;`bidPrice);
    (`ccy;`baseCcy);
    )
>
>
> vi src/fxmarketdata-pipeline-spec.q
>
> #AFTER
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==12, NR==20'
mapCols:(!) . flip (
    (`time;`eventTimestamp);
    (`sym;`instrumentID);
    (`asset;`ccyPair);
    (`ask;`askPrice);
    (`bid;`bidPrice);
    (`ccy;`baseCcy);
    (`qualifier;`Qualifier)
    )
>
> #BEFORE
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==20, NR==48'
SCHEMA: flip $[;()] each (!) . flip (
   (`date;         "d");
   (`sym;          "s");
   (`time;         "p");
   (`srcTime;      "p");
   (`recvTime;     "p");
   (`src;          "s");
   (`tenor;        "s");
   (`asset;        "s");
   (`deliverable;  "b");
   (`level;        "s");
   (`ccy;          "s");
   (`valueDate;    "d");
   (`ask;          "f");
   (`bid;          "f");
   (`askPoints;    "f");
   (`bidPoints;    "f");
   (`askQuoteID;   "");
   (`bidQuoteID;   "");
   (`askSize;      "f");
   (`bidSize;      "f");
   (`askCond;      "c");
   (`bidCond;      "c");
   (`askMinSize;   "f");
   (`bidMinSize;   "f");
   (`askExpiryTime;"p");
   (`bidExpiryTime;"p")
   );
>
>
> vi src/fxmarketdata-pipeline-spec.q
>
> #AFTER
> cat src/fxmarketdata-pipeline-spec.q| awk 'NR==20, NR==49'
SCHEMA: flip $[;()] each (!) . flip (
   (`qualifier;    "s");
   (`date;         "d");
   (`sym;          "s");
   (`time;         "p");
   (`srcTime;      "p");
   (`recvTime;     "p");
   (`src;          "s");
   (`tenor;        "s");
   (`asset;        "s");
   (`deliverable;  "b");
   (`level;        "s");
   (`ccy;          "s");
   (`valueDate;    "d");
   (`ask;          "f");
   (`bid;          "f");
   (`askPoints;    "f");
   (`bidPoints;    "f");
   (`askQuoteID;   "");
   (`bidQuoteID;   "");
   (`askSize;      "f");
   (`bidSize;      "f");
   (`askCond;      "c");
   (`bidCond;      "c");
   (`askMinSize;   "f");
   (`bidMinSize;   "f");
   (`askExpiryTime;"p");
   (`bidExpiryTime;"p")
   );
>

Note

In the pipeline schema definition, new overlay columns must be added at the start.

Re-pack

Re-pack the fsi-app-kx-flow package. It's recommended that the package version is modified so that developers and users are aware of its updated content.

Shell

Copy
> kxi package packit fsi-app-kx-flow --tag --version 1.0.1
Refreshing package before packing...
Writing fsi-app-kx-flow/manifest.yaml
Creating package from fsi-app-kx-flow
Package created: /tmp/artifacts/fsi-app-kx-flow-1.0.1.kxi
fsi-app-kx-flow-1.0.1.kxi
>

Deploy

Finally, deploy the fsi-app-kx-flow package with the new changes. Refer to How to Deploy the FSI Accelerator for these steps.

Removing Pipelines

The accelerator packages come with a suite of pipelines. Depending on your use-case, some of these pipelines may not be required. In such cases, these pipelines can be removed from the assembly by doing the following, here we are using the ice-orderbook package:

Unpack fsi-app-ice-orderbook assembly

Unpack the assembly package

Shell

Copy
> kxi package unpack fsi-app-ice-orderbook-1.0.0.kxi
> ls
fsi-app-ice-orderbook  fsi-app-ice-orderbook-1.0.0.kxi

Remove the Pipelines

Remove unneeded pipelines from the fsi-app-ice-orderbook package using kxi package rm. In the example below, all pipelines other than orderbook are removed:

Shell

Copy
> ls
fsi-ice-orderbook-1.0.0  fsi-app-ice-orderbook-1.0.0.kxi  fsi-lib-1.0.0.kxi
>
>
> kxi package info fsi-app-ice-orderbook
==PATH==
/tmp/artifacts/fsi-app-ice-orderbook

==OBJECT==
Package

==Manifest==
uuid: 5cf1e56b-0129-4309-b534-784916a20077
name: fsi-app-ice-orderbook
version: 1.0.0
metadata:
  pakxtime: '2024-09-09T04:44:37.941486'
entrypoints:
  aggregator: src/fsi.default.q
  data-access: src/fsi.default.q
  default: src/fsi.default.q
  resource-coordinator: src/fsi.default.q
system:
  _pakx_version: 1.10.0
tables:
  CanCor:
    file: tables/CanCor.yaml
  Contract:
    file: tables/Contract.yaml
  CorpActions:
    file: tables/CorpActions.yaml
  DividendRecord:
    file: tables/DividendRecord.yaml
  Exchange:
    file: tables/Exchange.yaml
  ExchangeHolidayCal:
    file: tables/ExchangeHolidayCal.yaml
  Instrument:
    file: tables/Instrument.yaml
  L2Quote:
    file: tables/L2Quote.yaml
  OpenClose:
    file: tables/OpenClose.yaml
  Quote:
    file: tables/Quote.yaml
  RelativeContract:
    file: tables/RelativeContract.yaml
  Trade:
    file: tables/Trade.yaml
  fsi_bar_Quote_dayStats:
    file: tables/fsi_bar_Quote_dayStats.yaml
  fsi_bar_Quote_minStats:
    file: tables/fsi_bar_Quote_minStats.yaml
  fsi_bar_Trade_dayStats:
    file: tables/fsi_bar_Trade_dayStats.yaml
  fsi_bar_Trade_minStats:
    file: tables/fsi_bar_Trade_minStats.yaml
  Depth:
    file: tables/Depth.yaml
  OrderbookSnapshots:
    file: tables/OrderbookSnapshots.yaml
  OrderbookUpdates:
    file: tables/OrderbookUpdates.yaml
databases:
  fsi-core-db:
    dir: databases/fsi-core-db
    shards:
    - fsi-core-db-shard
    tables: []
pipelines:
  icecorereference:
    file: pipelines/icecorereference.yaml
  icecrossreference:
    file: pipelines/icecrossreference.yaml
  icerealtimel1:
    file: pipelines/icerealtimel1.yaml
  icerealtimel2:
    file: pipelines/icerealtimel2.yaml
  orderbook:
    file: pipelines/orderbook.yaml
router: router/router.yaml
views:
  Realtime-Orderbook:
    file: views/Realtime-Orderbook.yaml
deployment_config: deployment_config/deployment_config.yaml
patches:
- uuid: 66d75b55-3bab-44c0-878d-a26ffb09721c
  name: FxFlowOverlay
- uuid: 6c0dbf60-363b-4688-aa18-b73aff5537fa
  name: futdaybars
- uuid: 1320affe-449c-405a-bb7f-b2ff14dcd731
  name: FxOverlay
>
>
> kxi package rm --from fsi-app-ice-orderbook pipeline --name icecorereference;kxi package rm --from fsi-app-ice-orderbook pipeline --name icecrossreference;kxi package rm --from fsi-app-ice-orderbook pipeline --name icerealtimel1;kxi package rm --from fsi-app-ice-orderbook pipeline --name icerealtimel2
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
>
>
> kxi package info fsi-app-ice-orderbook
==PATH==
/tmp/artifacts/fsi-app-ice-orderbook

==OBJECT==
Package

==Manifest==
uuid: 5cf1e56b-0129-4309-b534-784916a20077
name: fsi-app-ice-orderbook
version: 1.0.0
metadata:
  pakxtime: '2024-09-09T04:44:37.941486'
entrypoints:
  aggregator: src/fsi.default.q
  data-access: src/fsi.default.q
  default: src/fsi.default.q
  resource-coordinator: src/fsi.default.q
system:
  _pakx_version: 1.10.0
tables:
  CanCor:
    file: tables/CanCor.yaml
  Contract:
    file: tables/Contract.yaml
  CorpActions:
    file: tables/CorpActions.yaml
  DividendRecord:
    file: tables/DividendRecord.yaml
  Exchange:
    file: tables/Exchange.yaml
  ExchangeHolidayCal:
    file: tables/ExchangeHolidayCal.yaml
  Instrument:
    file: tables/Instrument.yaml
  L2Quote:
    file: tables/L2Quote.yaml
  OpenClose:
    file: tables/OpenClose.yaml
  Quote:
    file: tables/Quote.yaml
  RelativeContract:
    file: tables/RelativeContract.yaml
  Trade:
    file: tables/Trade.yaml
  fsi_bar_Quote_dayStats:
    file: tables/fsi_bar_Quote_dayStats.yaml
  fsi_bar_Quote_minStats:
    file: tables/fsi_bar_Quote_minStats.yaml
  fsi_bar_Trade_dayStats:
    file: tables/fsi_bar_Trade_dayStats.yaml
  fsi_bar_Trade_minStats:
    file: tables/fsi_bar_Trade_minStats.yaml
  Depth:
    file: tables/Depth.yaml
  OrderbookSnapshots:
    file: tables/OrderbookSnapshots.yaml
  OrderbookUpdates:
    file: tables/OrderbookUpdates.yaml
databases:
  fsi-core-db:
    dir: databases/fsi-core-db
    shards:
    - fsi-core-db-shard
    tables: []
pipelines:
  orderbook:
    file: pipelines/orderbook.yaml
router: router/router.yaml
views:
  Realtime-Orderbook:
    file: views/Realtime-Orderbook.yaml
deployment_config: deployment_config/deployment_config.yaml
patches:
- uuid: 66d75b55-3bab-44c0-878d-a26ffb09721c
  name: FxFlowOverlay
- uuid: 6c0dbf60-363b-4688-aa18-b73aff5537fa
  name: futdaybars
- uuid: 1320affe-449c-405a-bb7f-b2ff14dcd731
  name: FxOverlay

>

Re-pack

Re-pack the fsi-app-ice-orderbook package. It's recommended that the package version is modified to indicate its updated nature.

Shell

Copy
> kxi package packit fsi-app-ice-orderbook --tag --version 1.0.1
Refreshing package before packing...
Writing fsi-app-ice-orderbook/manifest.yaml
Creating package from fsi-app-ice-orderbook
Package created: /tmp/artifacts/fsi-app-ice-orderbook-1.0.1.kxi
fsi-app-ice-orderbook-1.0.1.kxi
>

Deploy

Finally, deploy the fsi-app-ice-orderbook package with the new changes. Refer to How to Deploy the FSI Accelerator for these steps.