Importing Data with Streaming Utilities
This topic describes available methods for loading data to OmniSci using streaming sources.
Note | If there is a potential for duplicate entries and you prefer to avoid loading duplicate rows, see How can I avoid creating duplicate rows? on the Troubleshooting page. |
StreamImporter
StreamImporter is an updated version of the StreamInsert utility used for streaming reads from delimited files into OmniSci Core. The difference is that StreamImporter uses a binary columnar load path, providing improved performance compared to the StreamInsert utility.
Note | StreamImporter is not supported in high-availability (HA) mode. |
You can ingest data from a data stream to an existing table in OmniSci using StreamImporter
on the command line.
StreamImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] [--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in OmniSci |
<database_name> |
n/a | Name of the target database in OmniSci |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of OmniSci host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for OmniSci Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform. |
--help |
n/a | List options |
Configure StreamImporter
to use your target table. StreamImporter
listens to a pre-defined data stream associated with your table. You must create the table before using the StreamImporter
utility.
The data format must be a record-level format supported by OmniSci.
StreamImporter
listens to the stream, validates records against the target schema, and ingests batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure StreamImporter
independent of the OmniSci Core engine. If StreamImporter is running but the database shuts down, StreamImporter shuts down as well. Reads from the stream are non-destructive.
StreamImporter
is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.
StreamImporter
does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.
There is a 1:1 correspondence between target table and a stream record.
cat tweets.tsv | ./StreamImporter tweets_small mapd -u imauser -p imapassword --delim '\t' --batch 100000 --retry_count 360 --retry_wait 10 --null null --port 9999
KafkaImporter
You can ingest data from an existing Kafka producer to an existing table in OmniSci using KafkaImporter
on the command line.
NOTE: KafkaImporter
requires a functioning Kafka cluster. See the Kafka website and the Confluent schema registry documentation.
KafkaImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] --brokers=<broker_name:broker_port> --group-id=
<kafka_group_id> --topic=<topic_type> [--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in OmniSci |
<database_name> |
n/a | Name of the target database in OmniSci |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of OmniSci host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for OmniSci Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform |
--help |
n/a | List options |
--brokers |
localhost:9092 | One or more brokers |
--group-id |
n/a | Kafka group ID |
--topic |
n/a | The Kafka topic to be ingested |
Configure KafkaImporter
to use your target table. KafkaImporter
listens to a pre-defined Kafka topic associated with your table. You must create the table before using the KafkaImporter
utility. For example, you might have a table named customer_site_visit_events
that listens to a topic named customer_site_visit_events_topic
.
The data format must be a record-level format supported by OmniSci.
KafkaImporter
listens to the topic, validates records against the target schema, and ingests topic batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism.
You can start, shutdown, and configure KafkaImporter
independent of the OmniSci Core engine. If KafkaImporter is running but the database shuts down, KafkaImporter shuts down as well. Reads from the topic are non-destructive.
KafkaImporter
is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.
KafkaImporter
does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.
There is a 1:1 correspondence between target table and topic.
cat tweets.tsv | ./KafkaImporter tweets_small mapd -u imauser -p imapassword --delim '\t' --batch 100000 --retry_count 360 --retry_wait 10 --null null --port 9999 --brokers=localhost:9092 --group-id=testImport1 --topic=tweet
StreamInsert
Stream data into OmniSci Core by attaching the StreamInsert program to the end of a data stream. The data stream can be another program printing to standard out, a Kafka endpoint, or any other real-time stream output. You can specify the appropriate batch size, according to the expected stream rates and your insert frequency. The target table must exist before you attempt to stream data into the table.
<data stream> | StreamInsert <table name> <database name> \
{-u|--user} <user> {-p|--passwd} <password> [{--host} <hostname>] \
[--port <port number>][--delim <delimiter>][--null <null string>] \
[--line <line delimiter>][--batch <batch size>][{-t|--transform} \
transformation ...][--retry_count <num_of_retries>] \
[--retry_wait <wait in secs>][--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in OmniSci |
<database_name> |
n/a | Name of the target database in OmniSci |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of OmniSci host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for OmniSci Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform. |
--help |
n/a | List options |
For more information on creating regex transformation statements, see RegEx Replace.
Example:
cat file.tsv | /path/to/mapd/SampleCode/StreamInsert stream_example \ mapd --host localhost --port 9091 -u imauser -p imapassword \ --delim '\t' --batch 1000