PostgreSQL Ingestion

This page demonstrates how to setup a simple PostgreSQL database and query it from the kdb Insights Stream Processor.

PostgreSQL is an open-source relational database system. This example demonstrates how to setup a simple PostgreSQL database and query it from the kdb Insights Stream Processor.

This example is split into a install and query. If you already have a running PostgreSQL database, consider jumping to the query section.

Install a PostgreSQL database

This example uses scripts provided by Bitnami to to install a PostgreSQL database.

Docker

Kubernetes

The following initdb.sql is provided as an example to setup a database with mock data.

SQL

Copy
CREATE DATABASE finance;

\c finance

CREATE TABLE stocks (
    id int,
    sym varchar(10),
    market varchar(10),
    name varchar(255),
    cap varchar(20)
);

INSERT INTO stocks VALUES (1, 'AAPL', 'NASDAQ', 'Apple Inc.', '$2.47T');
INSERT INTO stocks VALUES (2, 'MSFT', 'NASDAQ', 'Microsoft', '$2.32T');

To launch the PostgreSQL database in Docker, add a PostgreSQL service to a docker-compose.yaml

yaml

Copy
version: "3.3"
services:
  psql:
    image: docker.io/bitnami/postgres:latest
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: iamsecure
    restart: always
    volumes:
      - ./initdb.sql:/docker-entrypoint-initdb.d/initdb.sql

Finally, run docker-compose up to launch the database.

Being by installing a PostgreSQL database into the current Kubernetes cluster.

bash

Copy
helm install postgresql bitnami/postgresql
                    

This results in a postgresql image being launched in the current Kubernetes context.

bash

Copy
kubectl get pods | grep postgresql
                    

bash

Copy
NAME                      READY   STATUS    RESTARTS   AGE
postgresql-postgresql-0   1/1     Running   0          4h16m
                    

The PostgreSQL image will generate a random password. Extract the password from the cluster and store it in and environment variable called PGPASSWORD for later use.

bash

Copy
export PGPASSWORD=$(kubectl get secret postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)
                    

Adding mock data

Now that the database is running, add some mock data by interacting with a PostgreSQL client. Launch an interactive client by running the following.

bash

Copy
kubectl run postgresql-client --rm --tty -i --restart='Never' \
    --image docker.io/bitnami/postgresql:latest \
    --env="PGPASSWORD=$PGPASSWORD" \
    --command -- psql --host postgresql -U postgres -d postgres -p 5432
                    

To add a mock stocks table with some mock data, run each of the following commands in the client shell from above.

SQL

Copy
CREATE DATABASE testdb;
CREATE USER testdbuser WITH PASSWORD 'testpass';
GRANT ALL PRIVILEGES ON DATABASE testdb TO testdbuser;
CREATE TABLE stocks (id int, sym varchar(10), market varchar(10), name varchar(255), cap varchar(20));
INSERT INTO stocks VALUES (1, 'AAPL', 'NASDAQ', 'Apple Inc.', '$2.47T');
INSERT INTO stocks VALUES (2, 'MSFT', 'NASDAQ', 'Microsoft', '$2.32T');
                    

The end result should have a table with the mock data added

SQL

Copy
SELECT * FROM stocks
                    

text

Copy
 id | sym  | market |    name    |  cap
----+------+--------+------------+--------
  1 | AAPL | NASDAQ | Apple Inc. | $2.47T
  2 | MSFT | NASDAQ | Microsoft  | $2.32T
                    

For next steps, see the guide on issuing a PostgreSQL query from the Stream Processor, continue to the querying guide.

Data examples

Another example using car sample data follows:

SQL

Copy
CREATE TABLE cars (id int, Name varchar(250), "Miles_per_Gallon" smallint, "Cylinders" smallint, "Displacement" smallint, "Horsepower" smallint, "Weight_in_lbs" smallint NOT NULL, "Acceleration" smallint, "Year" date NOT NULL, "Origin" character varying(60));
INSERT INTO cars VALUES (1, 'chevrolet chevelle malibu', 18, 8, 307, 130, 3504, 12, '1970-01-01', 'USA');
INSERT INTO cars VALUES (2, 'volkswagen 1131 deluxe sedan', 26, 4, 97, 46, 1835, 21, '1970-01-01', 'Europe');
                    

The final table data should be as follows:

SQL

Copy
SELECT * FROM cars
                    

text

Copy
id |             name             | Miles_per_Gallon | Cylinders | Displacement | Horsepower | Weight_in_lbs | Acceleration |    Year    | Origin
----+------------------------------+------------------+-----------+--------------+------------+---------------+--------------+------------+--------
  1 | chevrolet chevelle malibu    |               18 |         8 |          307 |        130 |          3504 |           12 | 1970-01-01 | USA
  2 | volkswagen 1131 deluxe sedan |               26 |         4 |           97 |         46 |          1835 |           21 | 1970-01-01 | Europe

PostgreSQL queries

This section provides an overview of integrating the Stream Processor with a PostgreSQL database.

Note

This example uses a mock database which can be setup by following the PostgreSQL setup guide

The Stream Processor provides a reader interface for issuing queries on a PostgreSQL database. The PostgreSQL Reader API can be used as a data source in a pipeline.

The following spec.q will run a select query against a "finance" database and write it to the console. The details of the database will be configured during deployment.

q

Copy
.qsp.run
    .qsp.read.fromPostgres["SELECT * FROM stocks"; "finance"]
    .qsp.write.toConsole[]
                  

Note

The example below requires a PostgreSQL database to be running in the cluster as setup in the Kubernetes section of this tutorial.

bash

Copy
jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" --arg pass "$PGPASSWORD" \
    '{
        name     : "psql",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            KXI_SP_POSTGRES_SERVER   : "postgresql",
            KXI_SP_POSTGRES_PORT     : "5432",
            KXI_SP_POSTGRES_DATABASE : "finance",
            KXI_SP_POSTGRES_USERNAME : "postgres",
            KXI_SP_POSTGRES_PASSWORD : $pass
        }
    }' | jq -asR .)" | jq -r .id)

                    

Once deployed, check the console output of the deployed spwork pod to see the result of the query.