Ingest Data from Parquet Files
This page demonstrates how to ingest Apache Parquet files into kdb Insights Enterprise using Pipelines in the Web Interface.
Parquet is an efficient data storage format. This tutorial demonstrates how to ingest data from Apache Parquet files stored in a Microsoft Azure storage bucket to the kdb Insights Enterprise database. The parquet files used in this tutorial are the New York City taxi ride data from February 2022. These files are provided by the NYC Taxi and Limousine Commission.
In cases where you wish to ingest your own data, stored in Parquet file format, you can use the pipeline created as part of this tutorial as a template.
Prerequisites
-
kdb Insights Enterprise is installed
-
A copy of the February 2022 New York City taxi Parquet files stored in an accessible Microsoft Azure storage bucket.
-
A Kubernetes secret to access the Microsoft Azure storage bucket containing the Parquet files. The Parquet Reader q API documentation includes details on how to set up this secret.
Note
Suggested secret name
The rest of this tutorial assumes the secret name is
pqtcreds
This walkthrough guides you through the process to:
Create and deploy the database
Note
How to determine your database schema
If you are unsure about the data your Parquet files contain, you may need to query these files to investigate the available fields before defining your schema. To help with this analysis, use the available Parquet Reader q API.
-
On the Overview page, in the Quick Actions panel under Databases, choose Create New.
Note
A detailed walkthrough of how to create and deploy a database is available here.
-
Name the database
taxidb
. -
Open the Schema Settings tab and click
on the right-hand-side.
-
Paste the following JSON schema into the code editor:
Paste into the code editor
JSON
Copy[
{
"name": "taxitable",
"type": "partitioned",
"primaryKeys": [],
"prtnCol": "lpep_dropoff_datetime",
"sortColsDisk": [
"VendorID"
],
"sortColsMem": [
"VendorID"
],
"sortColsOrd": [
"VendorID"
],
"columns": [
{
"type": "long",
"attrDisk": "parted",
"attrOrd": "parted",
"name": "VendorID",
"attrMem": "",
"foreign": ""
},
{
"type": "timestamp",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "lpep_pickup_datetime",
"foreign": ""
},
{
"type": "timestamp",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "lpep_dropoff_datetime",
"foreign": ""
},
{
"type": "string",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "store_and_fwd_flag",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "RatecodeID",
"foreign": ""
},
{
"type": "long",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "PULocationID",
"foreign": ""
},
{
"type": "long",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "DOLocationID",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "passenger_count",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "trip_distance",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "fare_amount",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "extra",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "mta_tax",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "tip_amount",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "tolls_amount",
"foreign": ""
},
{
"type": "",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "ehail_fee",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "improvement_surcharge",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "total_amount",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "payment_type",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "trip_type",
"foreign": ""
},
{
"type": "float",
"attrDisk": "",
"attrMem": "",
"attrOrd": "",
"name": "congestion_surcharge",
"foreign": ""
}
]
}
] -
Apply the JSON.
-
Save the database.
-
Deploy the database.
When the database status changes to Active
, it is ready to use.
Create and deploy a Parquet reader pipeline
To create the pipeline:
-
Hover over Pipelines in the menu on the left pane and click the + symbol to add a pipeline.
-
Name this pipeline
taxipipeline
-
To add a Parquet reader node:
-
Type
parquet
in the search box -
Drag and drop the Parquet node into the pipeline pane.
The screen updates as illustrated:
-
-
Click on Parquet node in the pipeline pane to open the Configure Parquet Node screen where you can input the values described in the following table.
variable
value
details
Path Type
Azure
Select from dropdown.
Parquet URl
URl to your storage bucket
For example
, ms://
/trip_data/taxi=green/year=2022/month=02/green_tripdata*.parquet.
Note the *
present in the URl,*
matches any string in a directory or file name. Glob patterns provide a way to specify a path that can match one or more filesDecode Modality
Table
The format to which the parquet file is decoded, select from dropdown.
Use Watching
No
Watch for new storage bucket objects. This is unchecked here as the pipeline is intended to pick up a fixed set of parquet files.
Use Authentication
Yes
Use a Kubernetes secret with the cloud provider.
Kubernetes Secret
pqtcreds
Name of the Kubernetes secret.
Use Certificates
No
-
To add a Database Writer node to the pipeline:
-
Type
writers
in the search box. -
Drag and drop the kdb Insights Database node into the pipeline pane.
-
Connect the parquet reader node to the database writer node by clicking and drag the dot that represents the data output terminal on the Parquet node to the dot that represents the data input terminal on the kdb Insights Database node.
You screen updates as illustrated:
-
-
Configure the Database Writer to point to the database and table created previously. Use the variables described in the following table to complete the Configure kdb Insights Database Nodescreen.
variable
value
details
Database
taxidb
The kdb Insights Database to write the data to.
Table
taxitable
The table to write the data to in the selected database.
Write Direct to HDB
Yes
When enabled, data is directly written to the database.
Deduplicate Stream
No
Data is deduplicated. Useful if running multiple publishers that are generating the same data.
Set Timeout Valuable
No
Overwrite
Yes
Overwrites content within a each date partition with the new batch ingest table data.
Information
Refer to the kdb Insights Database writer for more details on configuration of this node.
-
Configure the pipeline memory. You must allocate sufficient memory to your pipeline to ensure it can ingest the parquet files.
Information
Parquet files can have varying levels of compression. When the file is read by the pipeline, the data held in memory is uncompressed. The level of compression determines the memory allocation required. If you're unsure of the memory required, allocate an amount that is 8 times the size of the largest parquet file you plan to ingest.
-
Click the Pipeline Settings tab.
-
Scroll down to locate the memory settings and set the values as shown below:
-
-
Start the pipeline by clicking Save & Deploy
-
The pipeline can be torn down once all the parquet files are processed.
Query the data
-
Create a new Query window.
-
Adjust the time range to any date-time range in February 2022, and click Get Data.
-
Data is displayed in the console as shown below.
The steps above show how easy it is to populate a database with a fixed set of parquet files using a Parquet Reader node.
Daily delivery of Parquet files from vendor
We can extend the walkthrough further to facilitate ingestion of parquet files as they are delivered to an object store bucket. One use case for this would be to ingest the data files from the previous day once they are copied into the bucket at a particular time each morning.
You can turn on the Watching feature on your Parquet Reader node to ensure new files that match the file pattern are ingested as they are delivered to the bucket.
This example continues to use the New York City taxi ride data. The assumption is that a vendor is pushing parquet files to your object storage location daily starting from March 1st 2022.
Configuration
-
Create a copy of the pipeline above by clicking Duplicate.
-
Update the following values in the Parquet Reader node:
variable
value
details
Parquet URl
URl to the storage bucket
For example,
ms://
/trip_data/taxi=green/year=2022/month=03/green_tripdata*.parquet
.Note the *
present in the URl,*
match any string in a directory or file name.Glob patterns provide a way to specify a path that can match one or more files Use Watching
Yes
Watch for new storage bucket objects. This is checked here as the pipeline is intended to poll for new parquet files
Note
The URL chosen means that any files for March 2022 are ingested as they arrive in the bucket. The pipeline could be left running for a month.
-
Update the following variable in the Database Writer node:
variable
value
details
Write Direct to HDB
No
This option is not supported when using the File Watcher
This pipeline shows how you can add data to the database for new data as it arrives in the storage bucket.
Further reading
-
Web interface overview explaining the functionality of the web interface
-
Create & manage databases to store data
-
Create & manage pipelines to ingest data
-
Create & manage queries to interrogate data
-
Create & manage views to visualize data