Encoders

This page details methods for encoding data into external formats in the Stream Processor.

Arrow

(Beta Feature) Serializes data into Arrow format.

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true. See Beta Feature Usage Terms.

.qsp.encode.arrow[]
.qsp.encode.arrow[.qsp.use (!) . flip (
 (`schema ; schema);
 (`matchCols ; matchCols);
 (`payloadType ; payloadType))]

Parameters:

options:

name

type

description

default

schema

table or dict

If a schema is provided, then it is used when serializing for every batch of data, giving improved performance. If not provided, schema is inferred at runtime for each batch of data.

::

matchCols

bool

This option is only used if a schema is provided and table data is passed in. If set to true, tables can be passed in with any column ordering, and the columns are reordered to match the provided schema. If false, no reordering is done, improving performance.

1b

payloadType

symbol

Indicates the message payload that will be encoded (one of auto, table, or arrays).

`auto

For all common arguments, refer to configuring operators

This operator encodes kdb+ table or array data into an Arrow stream.

Only data made up of inferred list types is supported, these types can be found here. Importantly, there is support for data containing only vector columns/arrays. However for general list type column/arrays, support is only provided for string or byte list columns/arrays.

Encode a kdb+ table to an Arrow stream, where the schema and payload type are inferred:

input:([]c1: til 5; c2: 10.1 10.2 10.3 10.4 10.5; c3: (enlist "a";"bbb";"ccc";"ddd";enlist "e"))
.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.arrow[]
 .qsp.write.toConsole[]
publish input
0xffffffffd80000001000000000000a000c0006000500..

Encode data to an Arrow stream, where the schema is specified and payload type is arrays :

schema: ([] c1: "j"$(); c2: "f"$(); c3: "h"$())
payLoadType: `arrays
input: (1 2 3j; 10.1 10.2 10.3f; 10 20 30h)
.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.arrow[.qsp.use (!) . flip (
 (`schema; schema);
 (`payloadType; payLoadType)
 )]
 .qsp.write.toConsole[]
publish input
0xffffffffe00000001000000000000a000c0006000500..

sp.encode.arrow()

Parameters:

options:

name

type

description

default

schema

table or dict

A table from which the schema is extracted and used when serializing each batch of data. If not provided, schema is inferred at runtime for each batch.

::

match_cols

bool

When set to True and a schema is provided, tables passed in have their columns rearranged to match the column ordering of the schema. No reordering is done otherwise.

1b

payload_type

symbol

A symbol indicating the message payload that will be encoded (one of auto, table, or arrays).

`auto

Returns: An arrow encoder, which can be joined to other operators or pipelines.

Examples:

>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> arrays = [
np.array([1, 2, 3], dtype=np.longlong),
np.array([10.1, 10.2, 10.3], dtype=np.float32),
np.array([10, 20, 30], dtype=np.int16)]
>>> tempTable = pd.DataFrame.from_records(arrays, columns=['x', 'x1', 'x2'])
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.arrow()
| sp.write.to_variable('out'))
>>> kx.q('publish', tempTable)
>>> kx.q('out')
0xffffffffc80000001000000000000a000c000600050008000a000000000104000c000000080008000000040...

CSV

Converts data into CSV format.

.qsp.encode.csv[]
.qsp.encode.csv[delimiter]
.qsp.encode.csv[delimiter; .qsp.use enlist[`header]!enlist header]

Parameters:

name

type

description

default

delimiter

character

Field separator for the records in the encoded data.

","

options:

name

type

description

default

header

symbol

Whether encoded data starts with a header row, either none, first or always.

first

For all common arguments, refer to configuring operators

This operator encodes tables into CSV data - strings with delimiter-separated values

Encode to CSV from a table:

// Generate a random table of data.
n: 10000
t: ([]
 date: n?.z.d + neg til 10;
 price: (n?1000)div 100;
 item: n?`3;
 description: {rand[100]?.Q.an}each til n;
 quantity: n?10000)
.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.csv["|"]
 .qsp.write.toConsole[]
publish t
2022.11.02D09:08:38.553535126 | "date|price|item|description|quantity"
2022.11.02D09:08:38.553535126 | "2022-11-01|2|mkp|F5e|9406"
2022.11.02D09:08:38.553535126 | "2022-10-31|1|igb|1lYkM405Ugfdk_VZLS6XW9zgtkqyNzgoB603WpmS9xV6Hx3Mtjd|4648"
2022.11.02D09:08:38.553535126 | "2022-10-28|9|kij|41EYGoQFLXlPVw50EWPvnkjSyB9HaB7qYtRcu69n|2261"
2022.11.02D09:08:38.553535126 | "2022-10-30|5|dih|DDBVC3uUbhpwdgEQsIt|8552"
2022.11.02D09:08:38.553535126 | "2022-11-02|4|dpb|zXYM60LQRdCQliV1HTtkZmgLrSNLqcKqj0teyswl61vmFkiC1MyatnwgGiAhtXGmX5QBVrOjFHQUt9s|8941"
2022.11.02D09:08:38.553535126 | "2022-10-26|1|hfa|HZ6fSzhC8rt_Zqd18|316"
..

We also allow Encoding to CSV from a dictionary where all values have the same count:

show d: flip 3?t
date | 2022.10.30 2022.10.30 2022.10.29
price | 4 4 1
item | gih gih gdi
description| "tFO1lLCuD" "tFO1lLCuD" "E3XKfjSRYr4MJW3RYtbinQTB3vjVIWlXbvRhUllaoIbviWXBYEb4OH2CsOvWaJd6Cqiyndg"
quantity | 3815 3815 4770
publish d
2022.11.03D11:26:45.226049104 | "2022-10-30|4|gih|tFO1lLCuD|3815"
2022.11.03D11:26:45.226049104 | "2022-10-30|4|gih|tFO1lLCuD|3815"
2022.11.03D11:26:45.226049104 | "2022-10-29|1|gdi|E3XKfjSRYr4MJW3RYtbinQTB3vjVIWlXbvRhUllaoIbviWXBYEb4OH2CsOvWaJd6Cqiyndg|4770"

sp.encode.csv()
sp.encode.csv('|')

Parameters:

name

type

description

default

delimiter

character

A field separator for the records in the encoded data, defaults to comma.

","

options:

name

type

description

default

header

symbol

Whether encoded data starts with a header row.

first

Returns: A csv encoder, which can be joined to other operators or pipelines.

Examples:

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.csv('|')
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name|id|quantity
a|1|4
b|2|5
c|3|6

JSON

Serializes data into JSON format.

.qsp.encode.json[]
.qsp.encode.json[.qsp.use enlist[`split]!enlist split]

Parameters:

options:

name

type

description

default

split

boolean

By default, batches are encoded as single JSON objects. Split encodes each value in a given batch separately. When the input is a table, this encodes each row as its own JSON object.

0b

For all common arguments, refer to configuring operators

This operator encodes a table as a JSON array of dictionaries.

.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.json[]
 .qsp.write.toVariable[`output]
publish ([] date: .z.d; time: .z.t; price: 3?100f; quantity: 3?100f)
// Format the output to help with viewing
-1 "},\n {" sv "},{" vs output;
[{"date":"2022-02-10","time":"17:46:36.545","price":48.29742,"quantity":73.15636},
 {"date":"2022-02-10","time":"17:46:36.545","price":51.24386,"quantity":3.691923},
 {"date":"2022-02-10","time":"17:46:36.545","price":51.5731,"quantity":97.58372}]

Encodes a table row by row in JSON

.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.json[.qsp.use``split!11b]
 .qsp.write.toVariable[`output]
publish ([] date: .z.d; time: .z.t; price: 3?100f; quantity: 3?100f)
// View the output
-1 output;
{"date":"2022-02-10","time":"17:46:36.545","price":48.29742,"quantity":73.15636}
{"date":"2022-02-10","time":"17:46:36.545","price":51.24386,"quantity":3.691923}
{"date":"2022-02-10","time":"17:46:36.545","price":51.5731,"quantity":97.58372}

Parameters:

options:

name

type

description

default

split

boolean

Whether a batch should be encoded as a single JSON object, or split into a separate JSON object for each value in the batch. When the input is a table, this encodes each row as a JSON object with the column names as keys mapping to the corresponding row values.

0b

Returns: A json encoder, which can be joined to other operators or pipelines.

Examples:

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.json()
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
{"name": "a", "id": 1, "quantity": 4}
{"name": "b", "id": 2, "quantity": 5}
{"name": "c", "id": 3, "quantity": 6}

Protocol Buffer

Serializes data into Protocol Buffers.

.qsp.encode.protobuf[message; file]
.qsp.encode.protobuf[message; file; .qsp.use (!) . flip (
 (`format ; format);
 (`payloadType; payloadType))]

Parameters:

name

type

description

default

message

string or symbol

The name of the Protocol Buffer message type to encode.

Required

file

symbol

The path to a .proto file containing the message type definition.

Required

options:

name

type

description

default

format

string

A string definition of the Protocol Buffer message format to encode.

""

payloadType

symbol

Indicates the message payload that will be encoded (one of auto, table, dict, array, or arrays).

auto

For all common arguments, refer to configuring operators

This operator encodes Protocol Buffer encoded messages from dictionaries or arrays of values.

Note

Import paths

To import your .proto file, the folder containing the .proto file is added as an import path. This means the folder will be scanned when importing future .proto files, so it is important that you avoid having .proto files with the same filename present in import paths you use.

Encode Protobuf messages using a Person.proto file:

// Person.proto
syntax="proto3";
message Person {
 string name = 1;
 int32 id = 2;
 string email = 3;
}
.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.protobuf[`Person;"Person.proto"]
 .qsp.write.toConsole[];
publish `name`id`email!("name"; 101i; "email@email.com")
2021.11.10D12:01:49.089788900 | "\n\004name\020e\032\017email@email.com"

Encode Protobuf messages using format from arrays:

format: "syntax=\"proto3\";
 message Person {
    string name = 1;
    int32 id = 2;
    string email = 3;
 }";
.qsp.run
 .qsp.read.fromCallback[`publish]
 .qsp.encode.protobuf[.qsp.use `message`format!(`Person;format)]
 .qsp.write.toConsole[];
publish ("name"; 101i; "email@email.com")
2021.11.10D12:01:49.089788900 | "\n\004name\020e\032\017email@email.com"

sp.encode.protobuf("MessageA",format=format_string,payload_type=PayloadType.auto)

Parameters:

name

type

description

default

message

string or symbol

The name of the Protocol Buffer message type to encode.

Required

path

symbol

The path to a .proto file containing the message type definition.

Required

options:

name

type

description

default

format

string

A string definition of the Protocol Buffer message format to encode.

""

payload_type

symbol

A PayloadType enum value, or the string equivalent.

auto

Returns: A protobuf encoder, which can be joined to other operators or pipelines.

Examples:

Person.proto file

syntax="proto3";
message Person {
  string name = 1;
  int32 id = 2;8
  string email = 3;
}
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> from enum import Enum
>>> from kxi.sp.encode import PayloadType
>>> proto_file = open('/tmp/Person.proto', 'w')
>>> proto_file.writen = (
'syntax="proto3";message Person { string name = 1;int32 id = 2;string email = 3;}')
>>> proto_file.close()
>>> format_string = ('syntax="proto3";message PersonA '
'{ string name = 1;int32 id = 2;string email = 3;}')
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.protobuf("PersonA",format=format_string,payload_type=PayloadType.auto)
| sp.write.to_variable('out'))
>>> # Define the name and email as byte strings, pykx by default converts strings into symbols
>>> data = {
'name': b'name',
'id': kx.IntAtom(101),
'email': b'email@email.com',
}
>>> kx.q('publish', data)
>>> kx.q('out')
"\\n\\004name\\020e\\032\\017email@email.com"