ELT, our Data Pipeline

Yes our pipeline is ELT and not ETL, Extract, Load and Transform is the new Extract, Transform and Load and this post ETL vs ELT: The Difference is in the How on Dzone makes a good case of which to use when, to know the difference at a glance, below are sample diagrams from the article showing ETL and ELT pipeline difference.



Our Initial Pipeline design was based on leveraging fluentd and Amazon S3 storage. From there on we would migrate the data into Google Cloud Storage for the purpose of importing the data into Big Query. Though this setup had served us well for over a year, it had its limitations that we managed along. The major drawback to this solution was to move the data to Big Query in time even though the process to import data was fast, we were bound by the windowing of the data due to the roll over of the file. To make it further clear, a file in fluentd can be rolled over by time and size, depending whichever limit it hits first. In theory and something we saw happening was if the file size would hit the limit before the time limit the new file would roll into the next hour and only uploaded when either it hits the rollover time limit or the size. Due to this the data imports for every hour would have to be scheduled at the 40th minute of the next hour to make sure all the data at least from the previous hour was uploaded. One does argue over why not just push the data into GCS directly, at the time we created the pipeline fluentd didn't have a plugin, and even now its not an officially supported fluentd plugin unlike for S3. Even if we were to upload the data to GCS, it still wouldn't resolve the issue of file rollover window, which was the bottleneck. Below is a diagram showing the pipeline design.


To counter the above mentioned problem, we decided to move our data to a Pub/Sub based stream model, where we would continue to push data as it arrives. As fluentd is the primary tool being used in all our servers to gather data, rather than replacing it we leveraged its plugin architecture to use a plugin to stream data into a sink of our choosing. Initially our inclination was towards Google PubSub and Google Dataflow as our Data Scientists/Engineers use Big Query extensively and keeping the data in the same Cloud made sense. The inspiration of using these tools came from Spotify’s Event Delivery – The Road to the Cloud. We did the setup on one of our staging server with Google PubSub and Dataflow. Both didn't really work out for us as PubSub model requires a Subscriber to be available for the Topic a Publisher streams messages to, otherwise the messages are not stored. On top of it there was no way to see which messages are arriving. During this the weirdest thing that we encountered was that the Topic would be orphaned losing the subscribers when working with Dataflow. PubSub we might have managed to live with, the wall in our path was Dataflow. We started off with using SCIO from Spotify to work with Dataflow, there is a considerate lack of documentation over it and found the community to be very reserved on Github, something quite evident in the world of Scala for which they came up with a Code of Conduct for its user base to follow. Something that was required from Dataflow for us was to support batch write option to GCS, after trying our hand at Dataflow to no success to achieve that, Google's staff at StackOverflow were quite responsive and their response confirmed that it was something not available with Dataflow and streaming data to BigQuery, Datastore or Bigtable as a datastore was an option to use. The reason we didn't do that was to avoid high streaming cost to these services to store data, as majority of our jobs from the data team are based on batched hourly data. The initial proposal to the updated pipeline is shown below.

GCS based Pipeline

After exhausting our efforts on Google's PubSub and Dataflow we decided to use the biggest player in this realm, Kafka. We run our Kafka cluster on Google Dataproc cluster (since writing of this post we have moved our Kafka cluster to regular GCE instances from Dataproc using our new IaaC setup). Dataproc is a managed cluster service that lets you spin up a working cluster in minutes based on your choice of image and size. Currently we are running Kafka for Scala 2.11. Few hardships that we faced while setting up Kafka were the lack of resources on the latest version. The internet is full of amazing material for version 0.8.X for which most of the stuff is deprecated in 0.10.X, some of it was supported in 0.9.X with which we started off, but for the same reason that there is so much stuff on the old version we decided to go with the latest i.e. Kafka is a one time installation and once you start storing Gigabytes of data upgrading isn't a trivial task anymore.

Amount of datasets that we generate and store in S3, moving them ain't an overnight task, so we needed a plan for migration in phases. Once again our long time trusted tool fluentd proved a worthy choice to use. We used fluentd copy plugin to stream data into Kafka along with storing it in S3, keeping our current pipeline intact and started to roll out the new pipeline along with the old one with zero impact to the business ;). To avoid all our servers from talking to Kafka and to make a single secure connection between both Amazon and Google cloud, we transmit all data meant for Kafka to a fluentd server a.k.a Tron, to us. Tron, from there on it forwards data in small size/time based chunks as recommended by Kafka to achieve higher throughput with impression of low latency as it is transmitting in near real time.

Once we had our producer working for Kafka , it was time for a consumer to start pulling data and push it to GCS. With some research over at Github we found Secor from Pinterest to be a viable option for our use. Though it being a great piece of software, it wasn't mapping ideally to our design, for that purpose we had to submit few Pull requests to make the necessary changes to the secor project for our use and the greater good of the open source community. From updating the docs (PR268, PR271, PR277) on how to set it up to adding flexible upload directory structure with hourly support (PR275) and support for partitioned parser with no offset folder (PR279), also added flexible delimited file reader, writer option (PR291) for better control over file structure. Below diagram is our current ELT pipeline running in production.

Upgraded Pipeline

Before we moved to using the data from Kafka, we kept it running as our secondary datasource imported in to Big Query to compare the data count. The data count seemed to be better coming from Kafka indicating that we were losing some data via our S3 based pipeline. As we started adding more and more datasets to Kafka we eventually hit a road bump due to memory issue causing server to fall out of the quorum, which was initially hard to identify with errors in the Tron server. The issue 97 was opened over to discuss on Github in Fluentd Kafka Plugin repository. That got drilled down to the plugin's limitation to counter it bound to the use of the gem ruby-kafka, a fix PR294 was contributed to resolve the issue from happening in future. The fix is now part of the fluent-plugin-kafka as well which will benefit many others. Happy to say that the patch works well in production :).

Few tips before I close this blog post, always have enough memory in Kafka server as it is very memory intensive and tends to shutdown gracefully every time it hits heap size limit without indicating anything in the logs. For monitoring your cluster use Kafka Manager, it does the job very well and also have KafkaT on server which saves you from running cumbersome builtin Kafka commands.

We had our Christmas and New Year in peace with our pipeline performing well over the holidays, and hope to spend more with the same consistency coming Chinese New Year. We are always looking to make our technology stack better and do necessary improvements to provide better performance. We like to take up challenges and solve them and in process learn as we grow.

Update 06072018

We have updated our batch pipeline which was running via our in-house AWS-GCS transfer utility to Google's GCS transfer utility and are using it for non-critical and time insensitive data. It has been performing well and we have managed to take the management and load of our engineers.

The detail diagram showing the whole design in action is in the below diagram.

View Comments