managing data workflows with luigi
TRANSCRIPT
1
by Teemu Kurppa www.teemukurppa.net
Metrics Monday at Custobar, Helsinki, 30.5.2016
Managing data workflows with Luigi
2
Customer analytics and marketing tool for retailers
I’m an advisor at your host:
www.ouraring.com
the world's first wellness ring
Head of Software: Cloud & Mobile
I work at
Introducing Data Workflows
4
gunzip -c /var/log/syslog.3.gz | grep -e UFW
Complex data workflowLet’s analyse if the weather affects sleep quality: • Get sleep data of all study participants • Get location data of all study participants • Fetch weather data for each day and location • Fetch historical weather data for each location • Calculate difference from an average weather for each
data point • Do a statistical analysis over users and days, comparing
weather data and sleep quality data
A lot of can go wrong on each step. Rerunning takes time
Case Custobar:ETL
Extract - Transform - Load
Case Custobar:ETL
Fetch custom sales.csv
from SFTPTransform
custom sales.csv to standard sales.json
Validate and throw away invalid
fields
Load valid sales data to database
Case Custobar:ETL
Fetch custom sales.csv
from SFTPTransform
custom sales.csv to standard sales.json
Validate and throw away invalid
fields
Load valid sales data to database
Transform
Load
Extract
Case Custobar:ETL
Fetch custom sales.csv
from SFTPTransform
custom sales.csv to standard sales.json
Validate and throw away invalid
fields
Load valid sales data to database
Do this, for millions of rows of initial data, and continue doing it every day, for products customers sales
Luigi by Spotify
11
Data workflow tools
Pinball by Pinterest
Luigi by Spotify
Airflow by AirBnB
Luigi Concepts
13
Luigi Concepts
Get Changed Customers
sql: Customers table
Tasks
Targets
Export Changed Customers to FTP
file://data/customers.csv
sftp://data/customers.csv
Dependencies
Luigi Concepts
Get Changed Customers
sql: Customers table
Tasks
Targets
Export Changed Customers to FTP
file://data/customers.csv
sftp://data/customers.csv
Dependencies
output()input() input() output()
requires()
Luigi Concepts
Get Changed Customers
sql: Customers table
Tasks
Targets
Export Changed Customers to FTP
file://data/customers.csv
sftp://data/customers.csv
Dependencies
company: Parameter date: DateParameter
company: Parameter date: DateParameter
Parameters
Concepts: Target
17
TargetTarget is simply something that exists or doesn’t exist
For example • a file in a local file system • a file in a remote file system • a file in an Amazon S3 bucket • a database row in a SQL database
Targetclass MongoTarget(Luigi.Target): def __init__(self, database, collection, predicate): self.client = MongoClient() self.database = database self.collection = collection self.predicate = predicate def exists(self): db = self.client[self.database] one = db[self.collection].find_one(self.predicate) return one is not None
TargetLots of ready-made targets in Luigi:
• local file • HDFS file • S3 key/value target • SSH remote target • SFTP remote target • SQL table row target • Amazon Redshift table row target • ElasticSearch target
Concepts: Task
21
Task: basic structureclass TransformDailySalesCSVtoJSON(Luigi.Task): def requires(self): #…
def run(self): # … def output(self): #…
Task: parametersclass TransformDailySalesCSVtoJSON(Luigi.Task):
date = luigi.DateParameter()
def requires(self): #… def run(self): # …
def output(self): #…
Task: requiresclass TransformDailySalesCSVtoJSON(Luigi.Task):
date = luigi.DateParameter()
def requires(self): return ImportDailyCSVFromSFTP(self.date)
def run(self): # … def output(self): #…
Task: outputclass TransformDailySalesCSVtoJSON(Luigi.Task):
date = luigi.DateParameter()
def requires(self): # …
def run(self): # …
def output(self): path = “/d/sales_%s.json” % (self.date.stftime(‘%Y%m%d’)) return luigi.LocalTarget(path)
Task: runclass TransformDailySalesCSVtoJSON(Luigi.Task):
date = luigi.DateParameter()
def requires(self): #…
def run(self): # Note: luigi’s input() and output() takes care of atomicity with self.input().open(‘r’) as infile:
data = transform_csv_to_dict(infile) with self.output().open(‘w’) as outfile:
json.dump(data, outfile)
def output(self): #…
Taskclass TransformDailySalesCSVtoJSON(Luigi.Task):
date = luigi.DateParameter()
def requires(self): return ImportDailyCSVFromSFTP(self.date)
def run(self): with self.input().open(‘r’) as infile:
data = transform_csv_to_dict(infile) with self.output().open(‘w’) as outfile:
json.dump(data, outfile)
def output(self): path = “/d/sales_%s.json” % (self.date.stftime(‘%Y%m%d’)) return luigi.LocalTarget(path)
TasksLots of ready-made tasks in Luigi:
• dump data to SQL table • copy to Redshift Table • run Hadoop job • query SalesForce • copy to Redshift Table • Load ElasticSearch index • …
Dependency patterns
29
Multiple dependencies
class TransformAllSales(Luigi.Task): def requires(self):
for i in range(1000): return [ImportInitialSaleFile(index=i)]
def run(self): #…
def output(self): #…
Dynamic dependencies
class LoadDailyAPIData(Luigi.Task): date = luigi.DateParameter()
def run(self): for filepath in os.listdir(‘/d/api_data/*.json’):
TransformDailyAPIData(filepath)
Wrapper task
class LoadAllDailyData(Luigi.WrapperTask): date = luigi.DateParameter()
def run(self): yield LoadDailyProducts(self.date) yield LoadDailyCustomers(self.date) yield LoadDailySales(self.date)
Why to use data workflow tools?
33
34
1. Resume the data workflow after a failure 2. Parametrize and rerun tasks every day 3. Organise code with shared patterns
35
Thanks! Questions?
Custobar is hiring! Approach Juha, Tatu or me to learn more
Follow @teemu on Twitter to stay in touch.