workflow engines + luigi
TRANSCRIPT
Data Pipeline ArchitectData Pipeline Architect
Workflow Engines + Luigi.A broad overview and a brief introduction.
Vladislav Supalov, 8 March 2016
Data Pipeline ArchitectData Pipeline Architect
Hi, I’m Vladislav
22
● Wow, that’s neat. We can do cool stuff with data.○ Machine Learning○ Data Mining○ Computer Vision
● DevOps? Shiny!○ Lots of servers being useful and reliable <3○ Automation
● Oh, so this is how businesses perceive things. I WAS BLIND.○ Business goals and values○ Measurable impact
Data Pipeline ArchitectData Pipeline Architect
Here’s What I Do
33
● Yes, please. All of this. Data engineering consulting.○ “We built data stuff in-house and it delivers lots of value!”
■ “But it also sucks. We are losing money.”■ “How can we do better?”
○ Mobile application marketing agencies■ Not necessarily huge data■ Very valuable and worthwhile topic■ datapipelinearchitect.com
Data Pipeline ArchitectData Pipeline Architect
Not Necessarily Big Data
44
● There’s Big Data○ It’s pretty fascinating, alright○ Most companies are a few steps away from having these problems
● Let’s talk more about○ Messy data (multiple data sources, no overview)○ Tedious-to-handle data (multiple data sources, lots of manual work)
Data Pipeline ArchitectData Pipeline Architect
The Big PictureActually handling the data is a very small part.Straightforward, once the business needs are clear.It’s about communication and people.
Data Pipeline ArchitectData Pipeline Architect
My First Data Pipeline
66
● ~20 GB per day● Legacy MongoDB setup
● BEFORE: “It takes HOURS to get query results!”● AFTER: “Already done. That was hardly a minute.”
● Google BigQuery○ Batch: daily○ Streaming: near real-time
● … So I wrote some modular scripts from scratch (Python & Bash)○ It worked alright○ I’m so sorry!
Data Pipeline ArchitectData Pipeline Architect
What’s Wrong with Custom Scripts?
77
● What happens when the original author leaves?○ hit-by-bus criterium
● Cost of ownership○ Learning curve, uniqueness○ Maintenance time, tricky bugs, code duplication○ Unexpected failure modes
● Extensibility● Growth● Metadata?
● You’re reinventing the wheel
Data Pipeline ArchitectData Pipeline Architect
Here’s What Most People Don’t Search For
88
“I need to get data from A to B on a regular basis!”
● ETL○ Extract○ Transform○ Load
● Long history● Even longer beard● A lot of enterprise-grade tools
→ Data pipelines
Data Pipeline ArchitectData Pipeline Architect
Data Plumbing - There Are Many Approaches
99
● Data Virtuality○ Access data across multiple sources transparently○ Redshift used in the background intelligently
● Snowplow○ “Event analytics platform” - designed to run on AWS services○ Generate special events instead of plumbing existing data
● Segment.io○ “Collect customer data with one API and send it to hundreds of tools for
analytics, marketing, and data warehousing.”
http://datapipelinearchitect.com/tools-for-combining-multiple-data-sources/
Data Pipeline ArchitectData Pipeline Architect
Workflow Engines!
1010
● Workflow = “[..] orchestrated and repeatable pattern of business activity [..]” [1]● Data flow = “bunch of data processing tasks with inter-dependencies” [2]
● Pipelines of batch jobs○ complex, long-running
● Dependency management● Reusability of intermediate steps● Logging and alerting● Failure handling● Monitoring● Lots of effort went into them (Broken data? Crashes? Partial failures?)
[1] https://en.wikipedia.org/wiki/Workflow[2] Elias Freider, 2013, “Luigi - Batch Data Processing in Python“
Data Pipeline ArchitectData Pipeline Architect
Workflow Engine Specimens
1111
● Oozie● Azkaban
○ XML, strong Hadoop ecosystem focus.
● Luigi● Airflow● Pinball
○ Glue!
● Google Cloud Dataflow● AWS Data Pipeline
○ Managed! Fancy.
A nice comparison: http://bytepawn.com/luigi-airflow-pinball.html
Data Pipeline ArchitectData Pipeline Architect
Let’s Talk Luigi!
1212
● Spotify○ Lots of data!○ 10k+ Hadoop jobs every day [1]
● Battle hardened○ Published 2009○ Has been used in production by large companies for a while
● Python● Modular & extensible● Dependency graph● Not just for data tasks
[1] Erik Bernhardsson, 2013, “Building Data Pipelines with Python and Luigi”
Data Pipeline ArchitectData Pipeline Architect
Core Goals and Concepts
1313
● Goals [1]○ Minimize boilerplate code○ As general as possible○ Easy to go from test to production
● Dependencies modeled as directed acyclic graph (DAG)● Tasks, Targets
● Assumptions:○ Idempotency○ Atomic file operations
■ File X is there? I’m done forever.
[1] Elias Freider, 2013, “Luigi - Batch Data Processing in Python“
Data Pipeline ArchitectData Pipeline Architect
What Luigi Provides
1414
● Parametrization (command line arguments)● Email alerts● Dependency resolution● Retries● History● Visualizations● Preventing duplication of effort
● Testable● Versioning-friendly● Collaborative● Community!
Data Pipeline ArchitectData Pipeline Architect
Workers and the Scheduler
15https://github.com/spotify/luigi15
Data Pipeline ArchitectData Pipeline Architect
Workers and the Scheduler
16https://github.com/spotify/luigi16
Data Pipeline ArchitectData Pipeline Architect
Workers and the Central Scheduler
1717
● Workers○ Crunch data○ Started via cron, or by hand
● Scheduler○ Not cron○ Doesn’t do any data processing
○ Synchronization○ Web interface - dashboard, visualizations○ Prevent same task to run multiple times○ Edit configuration → run luigid
Data Pipeline ArchitectData Pipeline Architect
A Luigi Script
1818
import luigi
# structureclass MyTask(luigi.Task): def requires(self): # a list of Task(s) def output(self): # a Target def run(self): # the work happens here
if __name__ == “__main__”:luigi.run()
---$ python dataflow.py MyTask
Data Pipeline ArchitectData Pipeline Architect
Parameters
1919
class MyTask(luigi.Task): # magic! param = luigi.Parameter(default=3)
[...]
---$ python dataflow.py MyTask --param 2
Data Pipeline ArchitectData Pipeline Architect
Task Inputs and Outputs
2020
[...]
# where the data goesdef output(self): return luigi.LocalTarget(“/data/out1-%s.txt” % self.param)
# what needs to run beforehanddef requires(self): return OtherTask(self.param)
[...]
Data Pipeline ArchitectData Pipeline Architect
Doing the Work
2121
[...]
def run(self): with self.input().open('r') as in_file:
with self.output().open(“w”) as out_file: # read from in_file, ???, write to out_file
[...]
---run can yield tasks, to create dynamic dependencies
Data Pipeline ArchitectData Pipeline Architect
The Perks
2222
● Specify dates
$ python dataflow.py MyTask --date 2016-03-08$ [...] --date_interval 2016-W20
● Concurrency
$ [...] --workers 3
● Lots of functionality already provided○ Targets (HDFS, S3, …)○ Tasks (HadoopJobTask, CopyToTable, ...)
Data Pipeline ArchitectData Pipeline Architect
Takeaways
2323
● Don’t consider data plumbing in isolation● Technical decisions should be informed by business needs & goals
● Don’t go with home-baked scripts○ “Quick and easy”? No.
● ETL is a thing● There are workflow engines
○ Lots of them○ Not only for data
● There are other approaches and services
● Luigi is a useful tool
Data Pipeline ArchitectData Pipeline Architect
Thanks! Let’s stay in touch :)You’ll also get a step-by-step guide on learning Luigi.
http://datapipelinearchitect.com/big-data-eindhoven/