Loading Data with SQL
COPY FROM
CSV/TSV Import
Use the following syntax for CSV and TSV files:
COPY <table> FROM '<file pattern>' [WITH (<property> = value, ...)];
<file pattern>
must be local on the server. The file pattern can contain
wildcards if you want to load multiple files. In addition to CSV, TSV, and TXT files, you can import compressed files in TAR, ZIP,7-ZIP, RAR, GZIP, BZIP2, or TGZ format.
You can import client-side files (\copy
command in mapdql)
but it is significantly slower. For large files, MapD recommends that you first scp
the file to the server, and then issue the COPY command.
<property>
in the optional WITH clause can be:
delimiter
: A single-character string for the delimiter between input fields. Default:","
(a CSV file)nulls
: A string pattern indicating a field is NULL. Default: an empty string or\N
header
: Either'true'
or'false'
, indicating whether the input file has a header line in Line 1 that should be skipped. Default:'true'
.escape
: A single-character string for escaping quotes. Default: the quote characterquoted
:'true'
or'false'
indicating whether the input file contains quoted fields. Default:'true'
.quote
: A single-character string for quoting a field. Default: double quote ("
). All characters inside quotes are imported “as is,” except for line delimiters.line_delimiter
A single-character string for terminating each line. Default:"\n"
array_marker
: A two-character string consisting of the start and end characters surrounding an array. Default:{ }
. For example, data to be inserted into a table with a string array in the second column (for example,BOOLEAN, STRING[], INTEGER
) can be written astrue,{value1,value2,value3},3
.array_delimiter
: A single-character string for the delimiter between input values contained within an array. Default:","
.threads
: Number of threads for performing the data import. Default: the number of CPU cores on the systemmax_reject
: Number of records that the COPY statement allows to be rejected before terminating the COPY command. Records can be rejected for a number of reasons, including invalid content in a field, or an incorrect number of columns. The details of the rejected records are reported in the ERROR log. COPY returns a message identifying how many records are rejected. The records that are not rejected are inserted into the table, even if the COPY stops due to themax_reject
count being reached. Default: 100,000.plain_text
: This parameter indicates that the input file is a plain text file so as to bypass thelibarchive
decompression utility. CSV, TSV, and TXT are always handled as plain text by default.lonlat
: The default behavior is to consume two columns as longitude (x) and then latitude (y) when populating a POINT column. If the order of the coordinates in the CSV file is the opposite, you can load the data using the WITH optionlonlat='false'
. Default:true
.
Note | By default, the CSV parser assumes one row per line. To import a file with multiple lines in a single field, specify |
Examples:
COPY tweets from '/tmp/tweets.csv' WITH (nulls = 'NA'); COPY tweets from '/tmp/tweets.tsv' WITH (delimiter = '\t', quoted = 'false'); COPY tweets from '/tmp/*' WITH (header='false');
Geo Import
You can use COPY FROM
to import geo files. Use the following syntax, depending on the file source.
Source | Syntax |
---|---|
Local server | COPY FROM '</filepath>' WITH (geo='true', ...);
|
Web site | COPY FROM '<[http | https]://website/filepath>' WITH (geo='true', ...);
|
Amazon S3 | COPY FROM '<s3://bucket/filepath>' WITH (geo='true', s3_region='region', s3_access_key='accesskey', s3_secret_key='secretkey', ... );
|
The following WITH
options are available for geo file imports from all sources:
geo_coords_type
: Coordinate type used; must begeography
.geo_coords_encoding
: Coordinates encoding; can begeoint(32)
ornone
. Default:geoint(32)
geo_coords_srid
: Coordinates spatial reference; must be4326
(WGS84 longitude/latitude).
The following file types are supported:
- ESRI Shapefile (.shp).
- GeoJSON (.geojson, .json, .geojson.gz, .json.gz).
- KML (.kml, kmz).
- File bundles:
- .zip
- .tar
- .tar.gz
- .tgz
NOTE: The first compatible file (.shp, .geojson, .kml) in the bundle is loaded, traversing subfolders until a compatible file is found. The rest of the contents in the bundle are ignored. If the bundle contains multiple filesets, unpack the file manually and specify it for import.
SQLImporter
java -cp [MapD JDBC driver]:[3rd party JDBC driver]
com.mapd.utility.SQLImporter -t [MapD table name] -su [external source user]
-sp [external source password] -c "jdbc:[external
source]://server:port;DatabaseName=some_database" -ss "[select statement]"
usage: SQLImporter
-b,--bufferSize <arg> Transfer buffer size
-c,--jdbcConnect <arg> JDBC Connection string
-d,--driver <arg> JDBC driver class
-db,--database <arg> MapD Database
-f,--fragmentSize <arg> Table fragment size
-i <arg> Path to initialization file.
-p,--passwd <arg> MapD Password
--port <arg> MapD Port
-r <arg> Row Load Limit
-s,--server <arg> MapD Server
-sp,--sourcePasswd <arg> Source Password
-ss,--sqlStmt <arg> SQL Select statement
-su,--sourceUser <arg> Source User
-t,--targetTable <arg> MapD Target Table
-tr,--truncate Drop and recreate the table, if it exists
-u,--user <arg> MapD User
SQL Importer executes a select statement on another database via JDBC and brings the result set into MapD Core.
If the table does not exist, SQL Importer creates the table in MapD Core.
If the truncate flag is set, it truncates the contents of the file.
If the file exists and truncate is not set, data import fails if the table does not match the SELECT statement metadata.
MapD recommends that you use a service account with read-only permissions when accessing data from a remote database.
The -i argument provides a path to an initialization file. Each line of the file is sent as a SQL statement to the remote server from which the data is copied. This can be used to set additional custom parameters before the data is loaded.
MySQL Example:
java -cp mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:
mysql/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar
com.mapd.utility.SQLImporter -t test1 -sp mypassword -su myuser
-c jdbc:mysql://localhost -ss "select * from employees.employees"
SQLServer Example:
java -cp
/path/to/mapd/bin/mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:/path/to/sqljdbc4.jar
com.mapd.utility.SQLImporter -d com.microsoft.sqlserver.jdbc.SQLServerDriver -t
mapd_target_table -su source_user -sp source_pwd -c
"jdbc:sqlserver://server:port;DatabaseName=some_database" -ss "select top 10 *
from dbo.some_table"
PostgreSQL Example:
java -cp
/p/to/mapd/bin/mapd-1.0-SNAPSHOT-jar-with-dependencies.jar:
/p/to/postgresql-9.4.1208.jre6.jar
com.mapd.utility.SQLImporter -t mapd_target_table -su source_user -sp
source_pwd -c "jdbc:postgresql://server/database" -ss "select * from some_table
where transaction_date > '2014-01-01'"
StreamInsert
Stream data into MapD Core by attaching the StreamInsert program to the end of a data stream. The data stream can be another program printing to standard out, a Kafka endpoint, or any other real-time stream output. You can specify the appropriate batch size, according to the expected stream rates and your insert frequency. The target table must exist before you attempt to stream data into the table.
<data stream> | StreamInsert <table name> <database name> \
{-u|--user} <user> {-p|--passwd} <password> [{--host} <hostname>] \
[--port <port number>][--delim <delimiter>][--null <null string>] \
[--line <line delimiter>][--batch <batch size>][{-t|--transform} \
transformation ...][--retry_count <num_of_retries>] \
[--retry_wait <wait in secs>][--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in MapD |
<database_name> |
n/a | Name of the target database in MapD |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of MapD host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails. See Tuning Your Stream Import Utility |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for MapD Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform. |
--help |
n/a | List options |
For more information on creating regex transformation statements, see RegEx Replace.
Example:
cat file.tsv | /path/to/mapd/SampleCode/StreamInsert stream_example \
mapd --host localhost --port 9091 -u imauser -p imapassword \
--delim '\t' --batch 1000
Importing AWS S3 Files
You can use the SQL COPY FROM
statement to import files stored on AWS S3 into a MapD table, in much the same way you would with local files. In the WITH
clause, specify the S3 credentials and region information of the bucket accessed.
COPY <table> FROM '<S3_file_URL>' WITH (s3_access_key = <key_name>,s3_secret_key = <key_name>,s3_region = <region>);
Access key, secret key, and region are required. For information about AWS S3 credentials, see https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys.
The following examples show failed and successful attempts to copy the table trips from AWS S3.
mapdql> COPY trips FROM 's3://mapd-s3-no-access/trip_data_9.gz';
Exception: failed to list objects of s3 url 's3://mapd-s3-no-access/trip_data_9.gz': AccessDenied: Access Denied
mapdql> COPY trips FROM 's3://mapd-s3-no-access/trip_data_9.gz' with (s3_access_key='xxxxxxxxxx',s3_secret_key='yyyyyyyyy');
Exception: failed to list objects of s3 url 's3://mapd-s3-no-access/trip_data_9.gz': AuthorizationHeaderMalformed: Unable to parse ExceptionName: AuthorizationHeaderMalformed Message: The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'us-west-1'
mapdql> COPY trips FROM 's3://mapd-parquet-testdata/trip.compressed/trip_data_9.csv' with (s3_access_key=’xxxxxxxx’,s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 100 recs, Rejected: 0 recs in 0.361000 secs
The following example imports all the files in the trip.compressed
directory.
mapdql> copy trips from 's3://mapd-parquet-testdata/trip.compressed/' with (s3_access_key=’xxxxxxxx’,s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 105200 recs, Rejected: 0 recs in 1.890000 secs
trips Table
The table trips
is created with the following statement:
mapdql> \d trips
CREATE TABLE trips (
medallion TEXT ENCODING DICT(32),
hack_license TEXT ENCODING DICT(32),
vendor_id TEXT ENCODING DICT(32),
rate_code_id SMALLINT,
store_and_fwd_flag TEXT ENCODING DICT(32),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
passenger_count SMALLINT,
trip_time_in_secs INTEGER,
trip_distance DECIMAL(14,2),
pickup_longitude DECIMAL(14,2),
pickup_latitude DECIMAL(14,2),
dropoff_longitude DECIMAL(14,2),
dropoff_latitude DECIMAL(14,2))
WITH (FRAGMENT_SIZE = 75000000);
HDFS
Consume a CSV or Parquet file residing in HDFS into MapD Core.
Copy the MapD JDBC driver into the sqoop lib, normally /usr/lib/sqoop/lib/
Example:
sqoop-export --table alltypes --export-dir /user/cloudera/ \
--connect "jdbc:mapd:192.168.122.1:9091:mapd" \
--driver com.mapd.jdbc.MapDDriver --username imauser \
--password imapassword --direct --batch
Troubleshooting: How to Avoid Duplicate Rows
To detect duplication prior to loading data into MapD Core Database, you can perform the following steps. For this example, the files are labeled A,B,C...Z.
Load file A into table
MYTABLE
.Run the following query.
select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;
There should be no rows returned; if rows are returned, your first A file is not unique.
Load file B into table
TEMPTABLE
.Run the following query.
select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;
There should be no rows returned if file B is unique. Fix B if the information is not unique using details from the selection.
Load the fixed B file into
MYFILE
.Drop table
TEMPTABLE
.Repeat steps 3-6 for the rest of the set for each file prior to loading the data to the real
MYTABLE
instance.
KafkaImporter
You can ingest data from an existing Kafka producer to an existing table in MapD using KafkaImporter
on the command line.
NOTE: KafkaImporter
requires a functioning Kafka cluster. See the Kafka website and the Confluent schema registry documentation.
KafkaImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] --brokers=<broker_name:broker_port> --group-id=
<kafka_group_id> --topic=<topic_type> [--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in MapD |
<database_name> |
n/a | Name of the target database in MapD |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of MapD host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails. See Tuning Your Stream Import Utility. |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for MapD Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform |
--help |
n/a | List options |
--brokers |
localhost:9092 | One or more brokers |
--group-id |
n/a | Kafka group ID |
--topic |
n/a | The Kafka topic to be ingested |
Configure KafkaImporter
to use your target table. KafkaImporter
listens to a pre-defined Kafka topic associated with your table. You must create the table before using the KafkaImporter
utility. For example, you might have a table named customer_site_visit_events
that listens to a topic named customer_site_visit_events_topic
.
The data format must be a record-level format supported by MapD.
KafkaImporter
listens to the topic, validates records against the target schema, and ingests topic batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism.
You can start, shutdown, and configure KafkaImporter
independent of the MapD core engine. If KafkaImporter is running but the database shuts down, KafkaImporter shuts down as well. Reads from the topic are non-destructive.
KafkaImporter
is not responsible for event ordering - a first class streaming platform outside MapD (for example, Spark streaming, flink) should handle the stream processing. MapD ingests the end-state stream of post-processed events.
KafkaImporter
does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.
There is a 1:1 correspondence between target table and topic.
cat tweets.tsv | ./KafkaImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999
--brokers=localhost:9092
--group-id=testImport1
--topic=tweet
StreamImporter
StreamImporter is an updated version of the StreamInsert utility used for streaming reads from delimited files into MapD core. The difference is that StreamImporter uses a binary columnar load path, providing improved performance compared to the StreamInsert utility.
Note | StreamImporter is not supported in high-availability (HA) mode. |
You can ingest data from a data stream to an existing table in MapD using StreamImporter
on the command line.
StreamImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <mapd_core_port>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <number_of_retries>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> \
[--line <line delimiter>] [--print_error][--print_transform]
Setting | Default | Description |
---|---|---|
<table_name> |
n/a | Name of the target table in MapD |
<database_name> |
n/a | Name of the target database in MapD |
-u |
n/a | User name |
-p |
n/a | User password |
--host |
n/a | Name of MapD host |
--delim |
comma (,) | Field delimiter, in single quotes |
--line |
newline (\n) | Line delimiter, in single quotes |
--batch |
10000 | Number of records in a batch |
--retry_count |
10 | Number of attempts before job fails. See Tuning Your Stream Import Utility |
--retry_wait |
5 | Wait time in seconds after server connection failure |
--null |
n/a | String that represents null values |
--port |
9091 | Port number for MapD Core on localhost |
-t|--transform |
n/a | Regex transformation |
--print_error |
False | Print error messages |
--print_transform |
False | Print description of transform. |
--help |
n/a | List options |
Configure StreamImporter
to use your target table. StreamImporter
listens to a pre-defined data stream associated with your table. You must create the table before using the StreamImporter
utility.
The data format must be a record-level format supported by MapD.
StreamImporter
listens to the stream, validates records against the target schema, and ingests batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure StreamImporter
independent of the MapD core engine. If StreamImporter is running but the database shuts down, StreamImporter shuts down as well. Reads from the stream are non-destructive.
StreamImporter
is not responsible for event ordering - a first class streaming platform outside MapD (for example, Spark streaming, flink) should handle the stream processing. MapD ingests the end-state stream of post-processed events.
StreamImporter
does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.
There is a 1:1 correspondence between target table and a stream record.
cat tweets.tsv | ./StreamImporter tweets_small mapd
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999
Tuning Your Stream Import Utility
You can change default settings in stream import utilities to maintain SLAs and enhance performance.
Retries
A retry is an attempt to connect to the mapd_server in the event of an error. Rather than just failing the first time, these processes need to be resilient.
After a failure, mapd_server waits retry_wait seconds, then attempts to reconnect to the remote server. The length of time your system should wait varies with your setup. For example, if you know that it takes 15 seconds to restart the mapd_server, there is little point in retrying until 15 seconds pass. You can try setting the retry-time to 15 sec. As a general rule, there is no downside to leaving it at the default. The reason the test examples have these variables set is to test and demonstrate their use.
The retry_count is the number of retries before the job aborts. Depending on how you run your operational infrastructure, you would set this value in different ways.
For example, if your environment requires that you be notified that issues are occurring after 2 minutes of not being able to reconnect, you can set retry_count to 2 and retry_wait to 60. After 2 minutes of not being able to connect, the job fails and you can take steps as needed to remedy the problem.
For another example, you might know that a particular machine is rebooted on a regular basis, and that it takes 12 minutes for the machine to come back on line. You can set the retry_count to a value that allows the process to survive the period of time the server is not available.
There are many scenarios around network activity or machine availability that can be addressed if you to extend retry resilience.