Pipeline Administration
This page describes the steps to customize existing pipelines, add new pipelines, or remove pipelines that are no longer needed.
In this example, we address a fsi-app-kx-flow
accelerator package.
Adding a New Pipeline
You can see how to add a new pipeline to fsi-app-kx-flow
in Ingesting Data.
Customizing an Existing Pipeline
Below outlines the steps to customize a pipeline in the fsi-app-kx-flow
package. The customized pipeline in this example is fxmarketdata.
Unpack fsi-app-kx-flow
Unpack the fsi-app-kx-flow package
Shell
> kxi package unpack fsi-app-kx-flow-1.0.0.kxi
> ls
fsi-app-kx-flow fsi-app-kx-flow-1.0.0.kxi
Customize the Quote Schema
Customize the Quote schema within the tables directory to add in the following field:
Qualifier | string
See the steps for how to customize the schema in Overlaying a Schema with Custom Columns.
Add New Columns to the Pipeline Spec
Add new columns in the fxmarketdata
pipeline spec fxmarketdata-pipeline-spec.q
by updating it's column mapping and schema definition. For more details on configuring pipeline specs, refer to Pipeline Configuration.
Shell
> cd fsi-app-kx-flow
> ls
databases deployment_config feed manifest.yaml patches pipelines router src
>
>
> #BEFORE
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==12, NR==19'
mapCols:(!) . flip (
(`time;`eventTimestamp);
(`sym;`instrumentID);
(`asset;`ccyPair);
(`ask;`askPrice);
(`bid;`bidPrice);
(`ccy;`baseCcy);
)
>
>
> vi src/fxmarketdata-pipeline-spec.q
>
> #AFTER
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==12, NR==20'
mapCols:(!) . flip (
(`time;`eventTimestamp);
(`sym;`instrumentID);
(`asset;`ccyPair);
(`ask;`askPrice);
(`bid;`bidPrice);
(`ccy;`baseCcy);
(`qualifier;`Qualifier)
)
>
> #BEFORE
> cat src/fxmarketdata-pipeline-spec.q | awk 'NR==20, NR==48'
SCHEMA: flip $[;()] each (!) . flip (
(`date; "d");
(`sym; "s");
(`time; "p");
(`srcTime; "p");
(`recvTime; "p");
(`src; "s");
(`tenor; "s");
(`asset; "s");
(`deliverable; "b");
(`level; "s");
(`ccy; "s");
(`valueDate; "d");
(`ask; "f");
(`bid; "f");
(`askPoints; "f");
(`bidPoints; "f");
(`askQuoteID; "");
(`bidQuoteID; "");
(`askSize; "f");
(`bidSize; "f");
(`askCond; "c");
(`bidCond; "c");
(`askMinSize; "f");
(`bidMinSize; "f");
(`askExpiryTime;"p");
(`bidExpiryTime;"p")
);
>
>
> vi src/fxmarketdata-pipeline-spec.q
>
> #AFTER
> cat src/fxmarketdata-pipeline-spec.q| awk 'NR==20, NR==49'
SCHEMA: flip $[;()] each (!) . flip (
(`qualifier; "s");
(`date; "d");
(`sym; "s");
(`time; "p");
(`srcTime; "p");
(`recvTime; "p");
(`src; "s");
(`tenor; "s");
(`asset; "s");
(`deliverable; "b");
(`level; "s");
(`ccy; "s");
(`valueDate; "d");
(`ask; "f");
(`bid; "f");
(`askPoints; "f");
(`bidPoints; "f");
(`askQuoteID; "");
(`bidQuoteID; "");
(`askSize; "f");
(`bidSize; "f");
(`askCond; "c");
(`bidCond; "c");
(`askMinSize; "f");
(`bidMinSize; "f");
(`askExpiryTime;"p");
(`bidExpiryTime;"p")
);
>
Note
In the pipeline schema definition, new overlay columns must be added at the start.
Re-pack
Re-pack the fsi-app-kx-flow
package. It's recommended that the package version is modified so that developers and users are aware of its updated content.
Shell
> kxi package packit fsi-app-kx-flow --tag --version 1.0.1
Refreshing package before packing...
Writing fsi-app-kx-flow/manifest.yaml
Creating package from fsi-app-kx-flow
Package created: /tmp/artifacts/fsi-app-kx-flow-1.0.1.kxi
fsi-app-kx-flow-1.0.1.kxi
>
Deploy
Finally, deploy the fsi-app-kx-flow
package with the new changes. Refer to How to Deploy the FSI Accelerator for these steps.
Removing Pipelines
The accelerator packages come with a suite of pipelines. Depending on your use-case, some of these pipelines may not be required. In such cases, these pipelines can be removed from the assembly by doing the following, here we are using the ice-orderbook package:
Unpack fsi-app-ice-orderbook assembly
Unpack the assembly package
Shell
> kxi package unpack fsi-app-ice-orderbook-1.0.0.kxi
> ls
fsi-app-ice-orderbook fsi-app-ice-orderbook-1.0.0.kxi
Remove the Pipelines
Remove unneeded pipelines from the fsi-app-ice-orderbook
package using kxi package rm
. In the example below, all pipelines other than orderbook are removed:
Shell
> ls
fsi-ice-orderbook-1.0.0 fsi-app-ice-orderbook-1.0.0.kxi fsi-lib-1.0.0.kxi
>
>
> kxi package info fsi-app-ice-orderbook
==PATH==
/tmp/artifacts/fsi-app-ice-orderbook
==OBJECT==
Package
==Manifest==
uuid: 5cf1e56b-0129-4309-b534-784916a20077
name: fsi-app-ice-orderbook
version: 1.0.0
metadata:
pakxtime: '2024-09-09T04:44:37.941486'
entrypoints:
aggregator: src/fsi.default.q
data-access: src/fsi.default.q
default: src/fsi.default.q
resource-coordinator: src/fsi.default.q
system:
_pakx_version: 1.10.0
tables:
CanCor:
file: tables/CanCor.yaml
Contract:
file: tables/Contract.yaml
CorpActions:
file: tables/CorpActions.yaml
DividendRecord:
file: tables/DividendRecord.yaml
Exchange:
file: tables/Exchange.yaml
ExchangeHolidayCal:
file: tables/ExchangeHolidayCal.yaml
Instrument:
file: tables/Instrument.yaml
L2Quote:
file: tables/L2Quote.yaml
OpenClose:
file: tables/OpenClose.yaml
Quote:
file: tables/Quote.yaml
RelativeContract:
file: tables/RelativeContract.yaml
Trade:
file: tables/Trade.yaml
fsi_bar_Quote_dayStats:
file: tables/fsi_bar_Quote_dayStats.yaml
fsi_bar_Quote_minStats:
file: tables/fsi_bar_Quote_minStats.yaml
fsi_bar_Trade_dayStats:
file: tables/fsi_bar_Trade_dayStats.yaml
fsi_bar_Trade_minStats:
file: tables/fsi_bar_Trade_minStats.yaml
Depth:
file: tables/Depth.yaml
OrderbookSnapshots:
file: tables/OrderbookSnapshots.yaml
OrderbookUpdates:
file: tables/OrderbookUpdates.yaml
databases:
fsi-core-db:
dir: databases/fsi-core-db
shards:
- fsi-core-db-shard
tables: []
pipelines:
icecorereference:
file: pipelines/icecorereference.yaml
icecrossreference:
file: pipelines/icecrossreference.yaml
icerealtimel1:
file: pipelines/icerealtimel1.yaml
icerealtimel2:
file: pipelines/icerealtimel2.yaml
orderbook:
file: pipelines/orderbook.yaml
router: router/router.yaml
views:
Realtime-Orderbook:
file: views/Realtime-Orderbook.yaml
deployment_config: deployment_config/deployment_config.yaml
patches:
- uuid: 66d75b55-3bab-44c0-878d-a26ffb09721c
name: FxFlowOverlay
- uuid: 6c0dbf60-363b-4688-aa18-b73aff5537fa
name: futdaybars
- uuid: 1320affe-449c-405a-bb7f-b2ff14dcd731
name: FxOverlay
>
>
> kxi package rm --from fsi-app-ice-orderbook pipeline --name icecorereference;kxi package rm --from fsi-app-ice-orderbook pipeline --name icecrossreference;kxi package rm --from fsi-app-ice-orderbook pipeline --name icerealtimel1;kxi package rm --from fsi-app-ice-orderbook pipeline --name icerealtimel2
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
Writing fsi-app-ice-orderbook/manifest.yaml
>
>
> kxi package info fsi-app-ice-orderbook
==PATH==
/tmp/artifacts/fsi-app-ice-orderbook
==OBJECT==
Package
==Manifest==
uuid: 5cf1e56b-0129-4309-b534-784916a20077
name: fsi-app-ice-orderbook
version: 1.0.0
metadata:
pakxtime: '2024-09-09T04:44:37.941486'
entrypoints:
aggregator: src/fsi.default.q
data-access: src/fsi.default.q
default: src/fsi.default.q
resource-coordinator: src/fsi.default.q
system:
_pakx_version: 1.10.0
tables:
CanCor:
file: tables/CanCor.yaml
Contract:
file: tables/Contract.yaml
CorpActions:
file: tables/CorpActions.yaml
DividendRecord:
file: tables/DividendRecord.yaml
Exchange:
file: tables/Exchange.yaml
ExchangeHolidayCal:
file: tables/ExchangeHolidayCal.yaml
Instrument:
file: tables/Instrument.yaml
L2Quote:
file: tables/L2Quote.yaml
OpenClose:
file: tables/OpenClose.yaml
Quote:
file: tables/Quote.yaml
RelativeContract:
file: tables/RelativeContract.yaml
Trade:
file: tables/Trade.yaml
fsi_bar_Quote_dayStats:
file: tables/fsi_bar_Quote_dayStats.yaml
fsi_bar_Quote_minStats:
file: tables/fsi_bar_Quote_minStats.yaml
fsi_bar_Trade_dayStats:
file: tables/fsi_bar_Trade_dayStats.yaml
fsi_bar_Trade_minStats:
file: tables/fsi_bar_Trade_minStats.yaml
Depth:
file: tables/Depth.yaml
OrderbookSnapshots:
file: tables/OrderbookSnapshots.yaml
OrderbookUpdates:
file: tables/OrderbookUpdates.yaml
databases:
fsi-core-db:
dir: databases/fsi-core-db
shards:
- fsi-core-db-shard
tables: []
pipelines:
orderbook:
file: pipelines/orderbook.yaml
router: router/router.yaml
views:
Realtime-Orderbook:
file: views/Realtime-Orderbook.yaml
deployment_config: deployment_config/deployment_config.yaml
patches:
- uuid: 66d75b55-3bab-44c0-878d-a26ffb09721c
name: FxFlowOverlay
- uuid: 6c0dbf60-363b-4688-aa18-b73aff5537fa
name: futdaybars
- uuid: 1320affe-449c-405a-bb7f-b2ff14dcd731
name: FxOverlay
>
Re-pack
Re-pack the fsi-app-ice-orderbook
package. It's recommended that the package version is modified to indicate its updated nature.
Shell
> kxi package packit fsi-app-ice-orderbook --tag --version 1.0.1
Refreshing package before packing...
Writing fsi-app-ice-orderbook/manifest.yaml
Creating package from fsi-app-ice-orderbook
Package created: /tmp/artifacts/fsi-app-ice-orderbook-1.0.1.kxi
fsi-app-ice-orderbook-1.0.1.kxi
>
Deploy
Finally, deploy the fsi-app-ice-orderbook
package with the new changes. Refer to How to Deploy the FSI Accelerator for these steps.