urjit.io

30 Jul 2017

Shoveling data to Redshift with pipes and filters

Let’s say you have a datasource with multiple tables with a few Million rows each. A typical architecture will probably include some kind of an Analytics warehouse. For instance, AWS Redshift is a pretty good implementation and understands the PSQL dialect.

To keep the analytics data fresh, we need some way of shoveling data into that DB. In this scenario, we used Mysql as the OLTP-style, main application Datasource and Redshift as the analytics cluster.

This post explores using simple command-line tools to move ~275 Million rows into Redshift from a Mysql cluster in about 35-40 minutes on an average EC2 instance.

How to keep the two in sync?

There are two ways of looking at the data in your source database:

  • Tables that are append-only
  • Mutable tables - any row can potentially be mutated

The Mutable tables are an interesting problem here. To migrate that data to the destination, you either have to write smart scripts that will figure out what rows mutated and only apply those updates in a cascading style or maybe find some other way to filter only those rows that changed and apply them to the destination DB.

Another way of doing it might be to just dump the entire table from source to the destination - this one is easy to think about but involves heavy data-lifting. We tried to use the AWS DMS (Data Migration Service) to do this for us but it was difficult to debug, optimize and after chugging along for a few weeks, arbitrarily slowed down to a crawl. It was time to figure out a better solution:

Pipes and filters

Unix has a pretty useful pipe|filter philosophy we can rely on. Let’s take a real world example to explore how this idea can help us move data from a MySql DB to a Redshift DB:

The big picture:

We can bulk load data into Redshift via S3. Redshift has a convenient copy command that takes in a source S3 location and loads all that data into its tables. The Redshift documentation for the copy command is located here

This means if we can move our data into S3 in some format that Redshift will accept with the copy command, we can build a nice data ingestion pipeline. The next step then, is to move data from Mysql to S3 and that gives us a complete data-loading pipeline.

Let us break this down into Pipes and Filters

  • Pipe: Extract data from SRC-DB Mysql
  • Filter: Convert it into a format that Redshift’s S3 copy command is compatible with
  • Pipe: Load that data into S3
  • Pipe: Load data from S3 to Redshift

Pipe: Extract data from SRC-DB Mysql

The simplest way to extract a lot of data from Mysql is to use the command-line to get a csv style file with all the data. There are some mysql settings that we can use to our advantage:

  • --quick - This mysql cmdline flag tells mysql to not cache each query result, print each row as it is received. This is exactly what we want. Just fetch the rows and dump them into a file.
  • --batch - This flag ensures that mysql outputs just plan raw data so that we can transform it through various filters as needed.

Combining these two, we get this basic command skeleton:

mysql --quick --batch -u <username> --password <password> --host <host> --port 3306

Next, we want to ensure the order of the columns coming from the SRC-DB is the same as the expected order in the DEST-DB so we can just write a simple select statement and align everything. This helps us avoid doing post-processing on the bulk data before inserts.

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"
mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD"

Notice two things about the SELECT_CMD:

  • The statement manipulates data right on the Mysql server (the hex command). Any data manipulation you want is most probably going to be faster to perform on the server itself so we can write all of that in the SELECT_CMD.
  • The limit 100 clause - it is a good practice to start writing such scripts with a limited amount of data. We will remove this later when we are confident that everything works properly (and/or our Production DB isn’t dying under load etc).

Filter: Convert it into a format that Redshift’s S3 copy command is compatible with

Next, we can take the data output from mysql and transform it into a format that Redshift is happy with. We can use a simple sed transformation here. This will depend on how your data looks so we won’t go into a lot of detail here.

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"
mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD" \
| sed 's/\t/","/g;s/^/"/;s/$/"/;s/\n//g' > output.txt

Bonus Pipe: pv

From the linux man page: pv - monitor the progress of data through a pipe We can use the pv command to monitor progress of data flowing through pipe and print some progress reports and stats. Adding that to our command, we get this:

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"
mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD" \
| pv -a | sed 's/\t/","/g;s/^/"/;s/$/"/;s/\n//g' > output.txt

Pipe: Load the data into S3

To upload data to S3, there are two options:

  • Dump all data from mysql to the disk & then upload to S3 in the next step
  • Stream upload data to S3 as we get rows delivered from mysql and transformed in our pipeline

The disadvantage of the first approach is that we end up involving a lot of disk I/O which will slow us down so let’s explore the second option.

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"
mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD" \
| pv -a | sed 's/\t/","/g;s/^/"/;s/$/"/;s/\n//g' \
| ./gof3r_0.5.0_linux_amd64 put -b <my-redshift-bucket> \
-k analytics/production/best_data_table.csv

Here we are using the gof3r S3 upload tool. s3gof3r provides fast, parallelized, pipelined streaming access to Amazon S3. This awesome tool is optimized to get close to utilizing the full bandwidth available and does end-to-end integrity checks.

Pipe: Load data from S3 to Redshift

For this final step, we can just execute a load from s3 query on Redshift once our data upload is complete.

LOAD_CMD="copy best_data_table from 's3://<my-redshift-bucket>/analytics/production/best_data_table.csv' iam_role 'arn:aws:iam::<aws-account-id>:role/<role-name>' region 'us-west-2';"
echo "$LOAD_CMD" | PGPASSWORD=<password> psql -h <hostname>.redshift.amazonaws.com \
-p <port> -U <username> -d <database>

All the pipes and filters:

Combining all the pieces we created, I ran this on a table with 275 Million rows:

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"
mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD" \
| pv -a | sed 's/\t/","/g;s/^/"/;s/$/"/;s/\n//g' \
| ./gof3r_0.5.0_linux_amd64 put -b <my-redshift-bucket> \
-k analytics/production/best_data_table.csv

duration: 29m55.644912881s

The time taken for the first step was 29m55sec for 275MM rows on an EC2 instance: c4.2xlarge 8 vCPU 15 Gib RAM & 1,000 Mbps bandwidth

LOAD_CMD="copy best_data_table from 's3://<my-redshift-bucket>/analytics/production/best_data_table.csv' iam_role 'arn:aws:iam::<aws-account-id>:role/<role-name>' region 'us-west-2';"
echo "$LOAD_CMD" | PGPASSWORD=<password> psql -h <hostname>.redshift.amazonaws.com \
-p <port> -U <username> -d <database>

Can we make this go faster?

There are still some tweaks that can improve throughput for us.

Using these two, we got some very nice speed improvements. Make sure your instance isn’t running other heavy processes, the throughput on a dedicated instance will be even higher:

SELECT_CMD="select `foo`, `bar`, `baz`, hex(`hex_bar`) from best_data_table limit 100;"

mysql --quick --batch -u <username> --password <password> --host <host> --port 3306 -e "$SELECT_CMD" \
| pv -a \
| parallel --pipe -j4 'sed "s/\t/\",\"/g;s/^/\"/;s/$/\"/;s/\n//g"' \
| pv -a \
| lzop \
| ./gof3r_0.5.0_linux_amd64 put -b <my-redshift-bucket> \
-k analytics/production/best_data_table.csv

[19.4MB/s]
[21.5MB/s]
[21.5MB/s]
duration: 23m14.257904691s

Observations & Conclusion

Running this on instances with higher bandwidth scales nicely. AWS provides support for Enhanced Networking on some instance types. Depending on what your bottleneck is, you can run this on instances that have more bandwidth and memory. Sprinkling pv at various points in the pipeline helps figure out where the bottleneck is.

The data source can be any other database and this pattern can be adopted to handle that.