Importing Data with Streaming Utilities

This topic describes available methods for loading data to OmniSci using streaming sources.

NoteIf there is a potential for duplicate entries and you prefer to avoid loading duplicate rows, see How can I avoid creating duplicate rows? on the Troubleshooting page.

StreamImporter

StreamImporter is an updated version of the StreamInsert utility used for streaming reads from delimited files into OmniSci Core. The difference is that StreamImporter uses a binary columnar load path, providing improved performance compared to the StreamInsert utility.

Note

StreamImporter is not supported in high-availability (HA) mode.

You can ingest data from a data stream to an existing table in OmniSci using StreamImporter on the command line.

StreamImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] [--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform.
--help n/a List options

Configure StreamImporter to use your target table. StreamImporter listens to a pre-defined data stream associated with your table. You must create the table before using the StreamImporter utility.

The data format must be a record-level format supported by OmniSci.

StreamImporter listens to the stream, validates records against the target schema, and ingests batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure StreamImporter independent of the OmniSci Core engine. If StreamImporter is running but the database shuts down, StreamImporter shuts down as well. Reads from the stream are non-destructive.

StreamImporter is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.

StreamImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.

There is a 1:1 correspondence between target table and a stream record.

cat tweets.tsv | ./StreamImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999

KafkaImporter

You can ingest data from an existing Kafka producer to an existing table in OmniSci using KafkaImporter on the command line.

NOTE: KafkaImporter requires a functioning Kafka cluster. See the Kafka website and the Confluent schema registry documentation.

KafkaImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] --brokers=<broker_name:broker_port> --group-id=
<kafka_group_id> --topic=<topic_type> [--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform
--help n/a List options
--brokers localhost:9092 One or more brokers
--group-id n/a Kafka group ID
--topic n/a The Kafka topic to be ingested

Configure KafkaImporter to use your target table. KafkaImporter listens to a pre-defined Kafka topic associated with your table. You must create the table before using the KafkaImporter utility. For example, you might have a table named customer_site_visit_events that listens to a topic named customer_site_visit_events_topic.

The data format must be a record-level format supported by OmniSci.

KafkaImporter listens to the topic, validates records against the target schema, and ingests topic batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shutdown, and configure KafkaImporter independent of the OmniSci Core engine. If KafkaImporter is running but the database shuts down, KafkaImporter shuts down as well. Reads from the topic are non-destructive.

KafkaImporter is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.

KafkaImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis. There is a 1:1 correspondence between target table and topic.

cat tweets.tsv | ./KafkaImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999
--brokers=localhost:9092
--group-id=testImport1
--topic=tweet

StreamInsert

Stream data into OmniSci Core by attaching the StreamInsert program to the end of a data stream. The data stream can be another program printing to standard out, a Kafka endpoint, or any other real-time stream output. You can specify the appropriate batch size, according to the expected stream rates and your insert frequency. The target table must exist before you attempt to stream data into the table.

<data stream> | StreamInsert <table name> <database name> \
{-u|--user} <user> {-p|--passwd} <password> [{--host} <hostname>] \
[--port <port number>][--delim <delimiter>][--null <null string>] \
[--line <line delimiter>][--batch <batch size>][{-t|--transform} \
transformation ...][--retry_count <num_of_retries>] \
[--retry_wait <wait in secs>][--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform.
--help n/a List options

For more information on creating regex transformation statements, see RegEx Replace.

Example:

cat file.tsv | /path/to/mapd/SampleCode/StreamInsert stream_example \
mapd --host localhost --port 9091 -u imauser -p imapassword \
--delim '\t' --batch 1000