We just imported the world – part III

In our last article, we configured Greenplum’s mass import utility: gpload. When we say it’s fast, you can believe us, look!

$ gpload -f ctrl/node.ctrl
2013-04-21 15:58:58|INFO|gpload session started 2013-04-21 15:58:58
2013-04-21 15:58:58|INFO|started gpfdist -p 8000 -P 9000
-f "nodes*.csv.gz" -t 30
2013-04-21 16:15:07|INFO|running time: 968.98 seconds
2013-04-21 16:15:07|INFO|rows Inserted = 2790813127
2013-04-21 16:15:07|INFO|rows Updated = 0
2013-04-21 16:15:07|INFO|data formatting errors = 0
2013-04-21 16:15:07|INFO|gpload succeeded

222 GB, 2.8 Billion rows imported in 15 minutes. The whole loading step took 47 minutes for 477 GB of data.
Now that raw data is in our database, we have to make it usable.

Transform and enrich the data

The previous loading step has been made in a “temporary” schema, that won’t be directly used by our API engine. We like to call it “dataprocessing”, because it’s the source of all the dark SQL magic; we copy data from dataprocessing schema to the final schema (called “planet”), and we enrich it:

  • All tables to which we added a date column must be filled with the correct date
  • We linked every OSM changesets to a country. That way, we’ll be able to “see” how the OSM data grew through time and space

So we wrote an SQL document to do this, and executed it.

That is all for Greenplum. We also wanted to compare it to AWS RedShift. They both are based on a PostgreSQL backend, and both rely on the now famous Map/Reduce algorithm. They are shared-nothing, Massively Parallel Processing databases. That is to say: many servers share the same structured database definition but handle their own data. As many AWS services, RedShift’s first look is a wow generator. With a few web API calls, we could start a 100 nodes cluster, get its JDBC URL and connect it directly. After having created a cluster, we configure a Security Group allowing ourselves to connect with psql:
psql -h ***.redshift.amazonaws.com -p 5439 -U *** -d ***
 

First, upload our data to AWS S3

The best way to load data into a RedShift database is to upload CSV files into an S3 bucket. Amazon patched the PostgreSQL COPY command to include S3 URL. But beware: S3 buckets store files which size can’t exceed 5 GB. We have to split our biggest CSV files – extractions from our Greenplum database – and compress them all. One other limitation is that Redshift supports a subset of UTF-8 characters (3 bytes maximum).

The good news was that we could reuse and enhance one part of our proprietary DB pump utility: for our customers we have build a tool able to extract from their production databases, in differential mode on a daily basis, their data and feed their datawarehouse. So we could easily extract from OSM table splitted CSV files with only valid UTF-8 characters.

Once files are generated, s3cmd helps us uploading all of them in the S3 bucket created in the same zone than the redshift cluster:

 $ s3cmd put *.gz s3://mybucket/osm
 

Second, upload our data to Redshift

The fact that we migrated our OSM database from 2 equivalent databases (both use a Postgres backend, with distribution key & partitioning/sort feature) allows us, with simples search/replace operations to migrate the Greenplum DDL to a compatible Redshift DDL. What a good thing!

For this test we decided to run a 8 1.xlarge Redshift so we could hope having between 1/3 and 1/2 the performances of  a 1/2 rack Greenplum appliance.

The CSV files produced could be handled by the custom Redshift command COPY:
COPY planet.nodes
FROM 's3://mybucket/osm/nodes'
credentials '********'
DELIMITER ';'
REMOVEQUOTES
ESCAPE
EMPTYASNULL
BLANKSASNULL
COMPUPDATE
GZIP;

In this example, it allows to load in one steps all nodes* files (there was 14  files changesets*.gz) into the destination table in 1 661s only.

Last, compare some queries

Once loading completed, we could at the end compare performances between our Greenplum appliance and the Redshift cluster.We discover at this stage we could optimize our in-database engine to fully exploit sort key mecanism when querying joins of fact tables (Greenplum doesn’t require this optimization thanks to its management of partitions).This was implemented in our softwares so full scan tables could be limited when querying Redshift

With usual joins we observed similar times on both sides:
SELECT
-- count
COUNT(a1."id") AS "count"
,
COUNT(distinct a2."id") AS "count"
,
a2."country_code" AS "Country"
FROM
"planet"."ways" a1
INNER JOIN "planet"."changesets" a2
ON (a2."id"=a1."changeset_id" AND a2."creation_date"=a1."date")
WHERE
((((a2."creation_date">=DATE '2012-03-01'
AND a2."creation_date"<=DATE '2012-11-01'))))
AND
((((a1."date">=DATE '2012-03-01'
AND a1."date"<=DATE '2012-11-01'))))
GROUP BY a2."country_code"
ORDER BY
"Country" DESC
;

With exists condition, Greenplum is twice faster than Redshift
SELECT
-- count
COUNT(a1."id") AS "count"
,
a2."country_code" AS "Country"
FROM
"planet"."ways" a1
INNER JOIN "planet"."changesets" a2
ON (a2."id"=a1."changeset_id" AND a2."creation_date"=a1."date")
WHERE
EXISTS (-- EXISTS

SELECT
1 AS "a3"
FROM
"planet"."way_tags" a4
WHERE
a1."id"=a4."id" AND (a1."date"=a4."date" AND a4."version"=1)
AND
CASE WHEN (a4."k"='landuse' AND a4."v" IN ('residential','industrial','commercial')) THEN ('Urban Zone')
WHEN (a4."k"='highway' AND a4."v" IN ('primary','secondary','residential','service','unclassified','trunk','motorway')) THEN ('HighWays')
WHEN (a4."k" IN ('highway','bicycle') AND a4."v" IN ('track','footway','pedestrian','cycleway','yes')) THEN ('GreenWays')
ELSE ('Other') END IN ('Urban Zone')
AND
a4."date">=DATE '2012-01-01' AND a4."date"<=DATE '2013-01-01'

)
AND
a1."date">=DATE '2012-01-01' AND a1."date"<=DATE '2013-01-01'

GROUP BY a2."country_code"
ORDER BY
"Country" DESC

 

Conclusion

Redshift delivers what it promises, and this as a cloud based self provisioning system. Wow! But nothing is perfect, there are few limitations on SQL native functions, character set support or DDL capabilites. Fortunately there are possible workarounds. Based on our experience in running such architectures on daily basis, we would also have preferred to have more CPU cores on each redshift node to better handle multiple queries execution on huge volumes and limit CPU contention. Indeed a single aggregation query can take more than 50% of the whole CPU available according to the Redshift console.

 

Tags: