luigi - batch data processing in python (pydata sv 2013)
DESCRIPTION
Slides for a talk given by Elias Freider at PyData Silicon Valley 2013TRANSCRIPT
![Page 2: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/2.jpg)
Background
Billions of log messages (several TBs) every dayUsage and backend stats, debug informationWhat we want to do
AB-testingMusic recommendationsMonthly/daily/hourly reportingBusiness metric dashboards
We experiment a lot – need quick development cycles
We have a lot of data
Why did we build Luigi?
![Page 3: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/3.jpg)
How to do it?
HadoopData storageClean, filter, join and aggregate data
PostgresSemi-aggregated data for dashboards
CassandraTime series data
![Page 4: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/4.jpg)
A bunch of data processing tasks with inter-dependencies
Defining a data flow
![Page 5: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/5.jpg)
Input is a list of Timestamp, Track, Artist tuples
Example: Artist Toplist
Streams Artist Aggregation
Top 10 Database
![Page 6: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/6.jpg)
Naive (non-luigi) approach
Example: Artist Toplist
![Page 7: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/7.jpg)
If a failed step leaves behind broken data, we want to clean that up
Errors will occur
![Page 8: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/8.jpg)
The second step fails, you fix it, then you want to resume
Avoid duplicate work
![Page 9: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/9.jpg)
To use data flows as command line tools
Parametrization
![Page 10: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/10.jpg)
You want to run the dataflow for a set of similar inputs
Repeat!
![Page 11: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/11.jpg)
Dead-simple dependency definitions (think GNU make)Emphasis on Hadoop/HDFS integrationAtomic file operationsPowerful task templating using OOPData flow visualizationCommand line integration
Main features
A Python framework for data flow definition and execution
Introducing Luigi
![Page 12: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/12.jpg)
Luigi - Aggregate Artists
![Page 13: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/13.jpg)
Luigi - Aggregate ArtistsRun on the command line:$ python dataflow.py AggregateArtists
DEBUG: Checking if AggregateArtists() is completeINFO: Scheduled AggregateArtists()DEBUG: Checking if Streams() is completeINFO: Done scheduling tasksDEBUG: Asking scheduler for work...DEBUG: Pending tasks: 1INFO: [pid 74375] Running AggregateArtists()INFO: [pid 74375] Done AggregateArtists()DEBUG: Asking scheduler for work...INFO: DoneINFO: There are no more tasks to run at this time
![Page 14: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/14.jpg)
Running Hadoop MapReduce utilizing Hadoop Streaming or custom jar-filesRunning Hive and (soon) Pig queriesInserting data sets into Postgres
Luigi comes with pre-implemented Tasks for...
Reduces code-repetition by utilizing Luigi’s object oriented data model
Task templates and targets
Writing new ones are as easy as defining an interface and implementing run()
![Page 15: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/15.jpg)
Built-in Hadoop Streaming Python framework
Hadoop MapReduce
Very slim interfaceFetches error logs from Hadoop cluster and displays them to the userClass instance variables can be referenced in MapReduce code, which makes it easy to supply extra data in dictionaries etc. for map side joinsEasy to send along Python modules that might not be installed on the clusterBasic support for secondary sortRuns on CPython so you can use your favorite libs (numpy, pandas etc.)
Features
![Page 16: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/16.jpg)
Built-in Hadoop Streaming Python framework
Hadoop MapReduce
![Page 17: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/17.jpg)
So you can use local modules remotely on the cluster
Attach Python modules
![Page 18: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/18.jpg)
Set up local variables for map-side joins etc.
Local state is transferred
![Page 19: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/19.jpg)
Top 10 artists - Wrapped arbitrary Python code
Completing the top list
![Page 20: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/20.jpg)
Basic functionality for exporting to Postgres. Cassandra support is in the works
Database support
![Page 21: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/21.jpg)
Running it all...DEBUG: Checking if ArtistToplistToDatabase() is completeINFO: Scheduled ArtistToplistToDatabase()DEBUG: Checking if Top10Artists() is completeINFO: Scheduled Top10Artists()DEBUG: Checking if AggregateArtists() is completeINFO: Scheduled AggregateArtists()DEBUG: Checking if Streams() is completeINFO: Done scheduling tasksDEBUG: Asking scheduler for work...DEBUG: Pending tasks: 3INFO: [pid 74811] Running AggregateArtists()INFO: [pid 74811] Done AggregateArtists()DEBUG: Asking scheduler for work...DEBUG: Pending tasks: 2INFO: [pid 74811] Running Top10Artists()INFO: [pid 74811] Done Top10Artists()DEBUG: Asking scheduler for work...DEBUG: Pending tasks: 1INFO: [pid 74811] Running ArtistToplistToDatabase()INFO: Done writing, importing at 2013-03-13 15:41:09.407138INFO: [pid 74811] Done ArtistToplistToDatabase()DEBUG: Asking scheduler for work...INFO: DoneINFO: There are no more tasks to run at this time
![Page 22: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/22.jpg)
Light-weight scheduler daemon with a web interface
Data flow visualization
![Page 23: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/23.jpg)
Imagine how cool this would be with real data...
The results
![Page 24: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/24.jpg)
Tasks have implicit __init__
Task Parameters
Generates command line interface with typing and documentation
Class variables with some magic
$ python dataflow.py AggregateArtists --date 2013-03-05
![Page 25: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/25.jpg)
Combined usage example
Task Parameters
![Page 26: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/26.jpg)
Makes it really easy to create aggregates over time
Task Parameters
![Page 27: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/27.jpg)
Basic multi-processing
Multiple workers
$ python dataflow.py --workers 3 AggregateArtists --date_interval 2013-W11
![Page 28: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/28.jpg)
Large data flows
3/16/13 Luigi overview
localhost:8081 1/2
AggregateTracks
(test=False,
date=2013-03-16,
test_users=False)
JoinArtistGids
(test=False,
date=2013-03-16,
nshards=12,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-16,
index_version=1362433077,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-16,
test_users=False)
UserRecs
(test=False,
date=2013-03-17,
rec_days=5,
exp_days=10,
test_users=False,
force_updates=False,
build_from_scratch=True,
index_path=/spotify/discover/index,
index_version=None,
FOLLOWS_SCORE=5.0)
AccumulateUserMatrices
(test=False,
date=2013-03-16,
index_version=1362433077,
test_users=False,
decay_factor=0.99,
days=5,
build_from_scratch=True)
AccumulateByArtists
(test=False,
date=2013-03-16,
test_users=False,
max_days=90,
decay_factor=0.99,
days=10,
build_from_scratch=True)
IngestUserRecs
(date=2013-03-17,
test_users=False,
ttl=604800)
AggregateByArtists
(test=False,
date=2013-03-10,
test_users=False)
AccumulateByArtists
(test=False,
date=2013-03-15,
test_users=False,
max_days=90,
decay_factor=0.99,
days=10,
build_from_scratch=True)
UserRecs
(test=False,
date=2013-03-16,
rec_days=5,
exp_days=10,
test_users=False,
force_updates=False,
build_from_scratch=True,
index_path=/spotify/discover/index,
index_version=None,
FOLLOWS_SCORE=5.0)
IngestUserRecs
(date=2013-03-16,
test_users=False,
ttl=604800)
AggregateByArtists
(test=False,
date=2013-03-13,
test_users=False)
JoinArtistGids
(test=False,
date=2013-03-11,
nshards=12,
test_users=False)
MasterMetadata
(date=2013-03-15)
JoinArtistGids
(test=False,
date=2013-03-15,
nshards=12,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-15,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-15,
index_version=1362433077,
test_users=False)
AccumulateUserMatrices
(test=False,
date=2013-03-15,
index_version=1362433077,
test_users=False,
decay_factor=0.99,
days=5,
build_from_scratch=True)
JoinArtistGids
(test=False,
date=2013-03-13,
nshards=12,
test_users=False)
EndSongCleaned
(date=2013-03-15)
AggregateTracks
(test=False,
date=2013-03-15,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-15,
test_users=False)
UserLocationPeriod
(test=False,
period=10,
date=2013-03-16,
test_users=False)
UserLocationPeriod
(test=False,
period=10,
date=2013-03-17,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-12,
index_version=1362433077,
test_users=False)
ArtistFollows
(date=2013-03-14)
EndSongCleaned
(date=2013-03-16)
UserLocationDay
(test=False,
date=2013-03-16,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-06,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-14,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-14,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-09,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-08,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-13,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-14,
index_version=1362433077,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-08,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-13,
index_version=1362433077,
test_users=False)
RelatedArtistsTC
()
AggregateByArtists
(test=False,
date=2013-03-12,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-11,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-06,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-12,
test_users=False)
JoinArtistGids
(test=False,
date=2013-03-12,
nshards=12,
test_users=False)
JoinArtistGids
(test=False,
date=2013-03-14,
nshards=12,
test_users=False)
IngestUserRecs
(date=2013-03-15,
test_users=False,
ttl=604800)
UserLocationDay
(test=False,
date=2013-03-07,
test_users=False)
AggregateByArtists
(test=False,
date=2013-03-07,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-11,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-10,
test_users=False)
UserLocationDay
(test=False,
date=2013-03-09,
test_users=False)
AggregateUserMatrices
(test=False,
date=2013-03-11,
index_version=1362433077,
test_users=False)
PlaylistVectors
(test=False,
date=2013-03-14,
index_version=1362433077)
(Screenshot from web interface)
![Page 29: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/29.jpg)
Visualization needs some work
Really large...
![Page 30: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/30.jpg)
Great for automated execution
Error notifications
![Page 31: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/31.jpg)
Prevents two identical tasks from running simultaneously
Process Synchronization
luigidSimple task synchronization
Data flow 1 Data flow 2
Common dependency Task
![Page 32: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/32.jpg)
Not just another Hadoop streaming framework!
What we use Luigi forHadoop StreamingJava Hadoop MapReduceHivePigLocal (non-hadoop) data processingImport/Export data to/from PostgresInsert data into Cassandrascp/rsync/ftp data files and reportsDump and load databases
Others using it with Scala MapReduce and MRJob as well
![Page 33: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/33.jpg)
Originated at SpotifyBased on many years of experience with data processingRecent contributions by Foursquare and BitlyOpen source since September 2012
http://github.com/spotify/luigi
Luigi is open source
![Page 34: Luigi - Batch Data Processing in Python (PyData SV 2013)](https://reader034.vdocuments.us/reader034/viewer/2022042623/54b769074a795971038b45df/html5/thumbnails/34.jpg)
For more information feel free to reach out at http://github.com/spotify/luigi
Thank you!Oh, and we’re hiring – http://spotify.com/jobs
Elias [email protected]