Loading Data with SQL

This topic describes several possible ways to load data to OmniSci using SQL commands.

Note If there is a potential for duplicate entries and you want to avoid loading duplicate rows, see Troubleshooting: How to Avoid Duplicate Rows.

COPY FROM

CSV/TSV Import

Use the following syntax for CSV and TSV files:

COPY <table> FROM '<file pattern>' [WITH (<property> = value, ...)];

<file pattern> must be local on the server. The file pattern can contain wildcards if you want to load multiple files. In addition to CSV, TSV, and TXT files, you can import compressed files in TAR, ZIP,7-ZIP, RAR, GZIP, BZIP2, or TGZ format.

You can import client-side files (\copy command in mapdql) but it is significantly slower. For large files, OmniSci recommends that you first scp the file to the server, and then issue the COPY command.

Available properties in the optional WITH clause are described in the following table.

Parameter Description Default Value
array_delimiter A single-character string for the delimiter between input values contained within an array. , (comma)
array_marker A two-character string consisting of the start and end characters surrounding an array. { }(curly brackets). For example, data to be inserted into a table with a string array in the second column (for example, BOOLEAN, STRING[], INTEGER) can be written as true,{value1,value2,value3},3
delimiter A single-character string for the delimiter between input fields; most commonly:
  • , for CSV files
  • \t for tab-delimited files

Other delimiters include | ,~, ^, and;.

Note: OmniSci does not use file extensions to determine the delimiter.

',' (CSV file)
escape A single-character string for escaping quotes. ' (quote)
header Either 'true' or 'false', indicating whether the input file has a header line in Line 1 that should be skipped. 'true'
line_delimiter A single-character string for terminating each line. '\n'
lonlat Indicates if a POINT column is defined by coordinates in a CSV file as longitude and then latitude ('true'), or vice versa ('false'). 'true'
max_reject

Number of records that the COPY statement allows to be rejected before terminating the COPY command. Records can be rejected for a number of reasons, including invalid content in a field, or an incorrect number of columns. The details of the rejected records are reported in the ERROR log. COPY returns a message identifying how many records are rejected. The records that are not rejected are inserted into the table, even if the COPY stops because the max_reject count is reached.

Note: If you run the COPY command from OmniSci Immerse, the COPY command does not return messages to Immerse once the SQL is verified. Immerse does not show messages about data loading, or about data-quality issues that result in max_reject triggers.

100,000
nulls A string pattern indicating that a field is NULL. An empty string, 'NA', or \N
plain_text Indicates that the input file is plain text so that it bypasses the libarchive decompression utility. CSV, TSV, and TXT are handled as plain text.
quote A single-character string for quoting a field. " (double quote). All characters inside quotes are imported “as is,” except for line delimiters.
quoted Either 'true' or 'false', indicating whether the input file contains quoted fields. 'true'
threads Number of threads for performing the data import. Number of CPU cores on the system
NoteBy default, the CSV parser assumes one row per line. To import a file with multiple lines in a single field, specify threads = 1 in the WITH clause.

Examples:

COPY tweets from '/tmp/tweets.csv' WITH (nulls = 'NA');
COPY tweets from '/tmp/tweets.tsv' WITH (delimiter = '\t', quoted = 'false');
COPY tweets from '/tmp/*'          WITH (header='false');

Geo Import

You can use COPY FROM to import geo files. Use the following syntax, depending on the file source.

Source Syntax
Local server
COPY FROM '</filepath>' WITH (geo='true', ...);
Web site
COPY FROM '<[http | https]://website/filepath>' WITH (geo='true', ...);
Amazon S3
COPY FROM '<s3://bucket/filepath>' WITH (geo='true', s3_region='region', s3_access_key='accesskey', s3_secret_key='secretkey', ... );

The following WITH options are available for geo file imports from all sources:

  • geo_coords_type: Coordinate type used; must begeography.
  • geo_coords_encoding: Coordinates encoding; can be geoint(32) or none. Default: geoint(32)
  • geo_coords_srid: Coordinates spatial reference; must be 4326 (WGS84 longitude/latitude).

The following file types are supported:

  • ESRI Shapefile (.shp).
  • GeoJSON (.geojson, .json, .geojson.gz, .json.gz).
  • KML (.kml, kmz).
  • File bundles:
    • .zip
    • .tar
    • .tar.gz
    • .tgz
    • NOTE: The first compatible file (.shp, .geojson, .kml) in the bundle is loaded, traversing subfolders until a compatible file is found. The rest of the contents in the bundle are ignored. If the bundle contains multiple filesets, unpack the file manually and specify it for import.

SQLImporter

SQLImporter is a Java utility run at the command line. It runs a SELECT statement on another database through JDBC and loads the result set into OmniSci Core.

java -cp [OmniSci JDBC driver]:[3rd party JDBC driver]
com.mapd.utility.SQLImporter
-u [OmniSci user] -p [OmniSci password]
-db [OmniSci database name] --port [OmniSci Core port, default is 9091] -t [OmniSci table name] 
-su [source database user] -sp [source database password]
-c "jdbc:[external database source]://server:port;DatabaseName=some_database"
-ss "[select statement]"

Flags

-b,--bufferSize <arg>      Transfer buffer size
-c,--jdbcConnect <arg>     JDBC Connection string
-d,--driver <arg>          JDBC driver class
-db,--database <arg>       OmniSci Database
-f,--fragmentSize <arg>    Table fragment size
-i <arg>                   Path to initialization file.
-p,--passwd <arg>          OmniSci Password
--port <arg>               OmniSci Port
-r <arg>                   Row Load Limit
-s,--server <arg>          OmniSci Server
-sp,--sourcePasswd <arg>   Source Password
-ss,--sqlStmt <arg>        SQL Select statement
-su,--sourceUser <arg>     Source User
-t,--targetTable <arg>     OmniSci Target Table
-tr,--truncate             Drop and recreate the table, if it exists
-u,--user <arg>            OmniSci User

OmniSci recommends that you use a service account with read-only permissions when accessing data from a remote database.

If the table does not exist in OmniSci Core, SQLImporter creates it. If the target table in OmniSci Core does not match the SELECT statement metadata, SQLImporter fails.

If the truncate flag is used, SQLImporter truncates the table in OmniSci Core before transferring the data. If the truncate flag is not used, SQLImporter appends the results of the SQL statement to the target table in OmniSci Core.

The -i argument provides a path to an initialization file. Each line of the file is sent as a SQL statement to the remote database. You can use -i to set additional custom parameters before the data is loaded.

Note The SQLImporter string is case-sensitive. Incorrect case returns the following:
 Error: Could not find or load main class com.mapd.utility.SQLimporter

MySQL Example

java -cp mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:mysql/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar
com.mapd.utility.SQLImporter -t test1 -sp mypassword -su myuser
-c jdbc:mysql://localhost -ss "select * from employees.employees"
-u mapd -p Password -db mapd --port 9091

SQLServer Example

java -cp /path/to/mapd/bin/mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:/path/to/sqljdbc4.jar
com.mapd.utility.SQLImporter -d com.microsoft.sqlserver.jdbc.SQLServerDriver 
-t mapd_target_table -su source_user -sp source_pwd 
-c "jdbc:sqlserver://server:port;DatabaseName=some_database" -ss "select top 10 * from dbo.some_table"
-u mapd -p Password -db mapd --port 9091

PostgreSQL Example

java -cp /opt/mapd/bin/mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:/tmp/postgresql-42.2.5.jar 
com.mapd.utility.SQLImporter -u mapd -p Password -db mapd --port 9091 
-t target_table -su postgres -sp Password 
-c "jdbc:postgresql://127.0.0.1/postgres" -ss "select * from public.source_table"

Google Big Query Example

sudo java -cp /raidStorage/prod/mapd/bin/mapd-1.0-SNAPSHOT-jar-with-dependencies-BQ.jar:./GoogleBigQueryJDBC42.jar:
./google-oauth-client-1.22.0.jar:./google-http-client-jackson2-1.22.0.jar:./google-http-client-1.22.0.jar:./google-api-client-1.22.0.jar:
./google-api-services-bigquery-v2-rev355-1.22.0.jar 
com.mapd.utility.SQLImporter-u mapd -p Password -su mapd -sp Password -db database_name --port 8100 
-t target_table -d com.simba.googlebigquery.jdbc42.Driver 
-c "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=project-id;OAuthType=0;
OAuthServiceAcctEmail==email@domain.iam.gserviceaccount.com;OAuthPvtKeyPath=/home/simba/myproject.json;" -ss "select * from sample.foo"

StreamInsert

Stream data into OmniSci Core by attaching the StreamInsert program to the end of a data stream. The data stream can be another program printing to standard out, a Kafka endpoint, or any other real-time stream output. You can specify the appropriate batch size, according to the expected stream rates and your insert frequency. The target table must exist before you attempt to stream data into the table.

<data stream> | StreamInsert <table name> <database name> \
{-u|--user} <user> {-p|--passwd} <password> [{--host} <hostname>] \
[--port <port number>][--delim <delimiter>][--null <null string>] \
[--line <line delimiter>][--batch <batch size>][{-t|--transform} \
transformation ...][--retry_count <num_of_retries>] \
[--retry_wait <wait in secs>][--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform.
--help n/a List options

For more information on creating regex transformation statements, see RegEx Replace.

Example:

cat file.tsv | /path/to/mapd/SampleCode/StreamInsert stream_example \
mapd --host localhost --port 9091 -u imauser -p imapassword \
--delim '\t' --batch 1000

Importing AWS S3 Files

You can use the SQL COPY FROM statement to import files stored on AWS S3 into a OmniSci table, in much the same way you would with local files. In the WITH clause, specify the S3 credentials and region information of the bucket accessed.

COPY <table> FROM '<S3_file_URL>' WITH (s3_access_key = <key_name>,s3_secret_key = <key_name>,s3_region = <region>);

Access key, secret key, and region are required. For information about AWS S3 credentials, see https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys.

The following examples show failed and successful attempts to copy the table trips from AWS S3.

mapdql> COPY trips FROM 's3://mapd-s3-no-access/trip_data_9.gz';
Exception: failed to list objects of s3 url 's3://mapd-s3-no-access/trip_data_9.gz': AccessDenied: Access Denied
mapdql> COPY trips FROM 's3://mapd-s3-no-access/trip_data_9.gz' with (s3_access_key='xxxxxxxxxx',s3_secret_key='yyyyyyyyy');
Exception: failed to list objects of s3 url 's3://mapd-s3-no-access/trip_data_9.gz': AuthorizationHeaderMalformed: Unable to parse ExceptionName: AuthorizationHeaderMalformed Message: The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'us-west-1'
mapdql> COPY trips FROM 's3://mapd-testdata/trip.compressed/trip_data_9.csv' with (s3_access_key=’xxxxxxxx’,s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 100 recs, Rejected: 0 recs in 0.361000 secs

The following example imports all the files in the trip.compressed directory.

mapdql> copy trips from 's3://mapd-testdata/trip.compressed/' with (s3_access_key=’xxxxxxxx’,s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 105200 recs, Rejected: 0 recs in 1.890000 secs

trips Table

The table trips is created with the following statement:

mapdql> \d trips
        CREATE TABLE trips (
        medallion TEXT ENCODING DICT(32),
        hack_license TEXT ENCODING DICT(32),
        vendor_id TEXT ENCODING DICT(32),
        rate_code_id SMALLINT,
        store_and_fwd_flag TEXT ENCODING DICT(32),
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count SMALLINT,
        trip_time_in_secs INTEGER,
        trip_distance DECIMAL(14,2),
        pickup_longitude DECIMAL(14,2),
        pickup_latitude DECIMAL(14,2),
        dropoff_longitude DECIMAL(14,2),
        dropoff_latitude DECIMAL(14,2))
WITH (FRAGMENT_SIZE = 75000000);

KafkaImporter

You can ingest data from an existing Kafka producer to an existing table in OmniSci using KafkaImporter on the command line.

NOTE: KafkaImporter requires a functioning Kafka cluster. See the Kafka website and the Confluent schema registry documentation.

KafkaImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] --brokers=<broker_name:broker_port> --group-id=
<kafka_group_id> --topic=<topic_type> [--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform
--help n/a List options
--brokers localhost:9092 One or more brokers
--group-id n/a Kafka group ID
--topic n/a The Kafka topic to be ingested

Configure KafkaImporter to use your target table. KafkaImporter listens to a pre-defined Kafka topic associated with your table. You must create the table before using the KafkaImporter utility. For example, you might have a table named customer_site_visit_events that listens to a topic named customer_site_visit_events_topic.

The data format must be a record-level format supported by OmniSci.

KafkaImporter listens to the topic, validates records against the target schema, and ingests topic batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shutdown, and configure KafkaImporter independent of the OmniSci Core engine. If KafkaImporter is running but the database shuts down, KafkaImporter shuts down as well. Reads from the topic are non-destructive.

KafkaImporter is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.

KafkaImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis. There is a 1:1 correspondence between target table and topic.

cat tweets.tsv | ./KafkaImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999
--brokers=localhost:9092
--group-id=testImport1
--topic=tweet

StreamImporter

StreamImporter is an updated version of the StreamInsert utility used for streaming reads from delimited files into OmniSci Core. The difference is that StreamImporter uses a binary columnar load path, providing improved performance compared to the StreamInsert utility.

Note

StreamImporter is not supported in high-availability (HA) mode.

You can ingest data from a data stream to an existing table in OmniSci using StreamImporter on the command line.

StreamImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--quoted <boolean>] [--line <line delimiter>] [--print_error][--print_transform]
Setting Default Description
<table_name> n/a Name of the target table in OmniSci
<database_name> n/a Name of the target database in OmniSci
-u n/a User name
-p n/a User password
--host n/a Name of OmniSci host
--delim comma (,) Field delimiter, in single quotes
--line newline (\n) Line delimiter, in single quotes
--batch 10000 Number of records in a batch
--retry_count 10 Number of attempts before job fails
--retry_wait 5 Wait time in seconds after server connection failure
--null n/a String that represents null values
--port 9091 Port number for OmniSci Core on localhost
-t|--transform n/a Regex transformation
--print_error False Print error messages
--print_transform False Print description of transform.
quoted 'true' Either 'true' or 'false', indicating whether the input file contains quoted fields.
--help n/a List options

Configure StreamImporter to use your target table. StreamImporter listens to a pre-defined data stream associated with your table. You must create the table before using the StreamImporter utility.

The data format must be a record-level format supported by OmniSci.

StreamImporter listens to the stream, validates records against the target schema, and ingests batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure StreamImporter independent of the OmniSci Core engine. If StreamImporter is running but the database shuts down, StreamImporter shuts down as well. Reads from the stream are non-destructive.

StreamImporter is not responsible for event ordering - a first class streaming platform outside OmniSci (for example, Spark streaming, flink) should handle the stream processing. OmniSci ingests the end-state stream of post-processed events.

StreamImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.

There is a 1:1 correspondence between target table and a stream record.

cat tweets.tsv | ./StreamImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999

Troubleshooting: How to Avoid Duplicate Rows

To detect duplication prior to loading data into OmniSci Core Database, you can perform the following steps. For this example, the files are labeled A,B,C...Z.

  1. Load file A into table MYTABLE.

  2. Run the following query.

    select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;

    There should be no rows returned; if rows are returned, your first A file is not unique.

  3. Load file B into table TEMPTABLE.

  4. Run the following query.

    select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;

    There should be no rows returned if file B is unique. Fix B if the information is not unique using details from the selection.

  5. Load the fixed B file into MYFILE.

  6. Drop table TEMPTABLE.

  7. Repeat steps 3-6 for the rest of the set for each file prior to loading the data to the real MYTABLE instance.

Importing Data from HDFS with Sqoop

You can consume a CSV or Parquet file residing in HDFS (Hadoop Distributed File System) into OmniSci Core.

Copy the OmniSci JDBC driver into the Apache Sqoop library, normally found at /usr/lib/sqoop/lib/.

Example

The following is a straightforward import command. For more information on options and parameters for using Apache Sqoop, see the user guide at sqoop.apache.org.

sqoop-export --table iAmATable \
--export-dir /user/cloudera/ \
--connect "jdbc:mapd:000.000.000.0:9091:mapd" \
--driver com.mapd.jdbc.MapDDriver \
--username imauser \
--password imapassword \
--direct \
--batch
Where the --connect parameter is the address of a valid JDBC port on your OmniSci instance.