managing data workflows with luigi

35
1 by Teemu Kurppa www.teemukurppa.net Metrics Monday at Custobar, Helsinki, 30.5.2016 Managing data workflows with Luigi

Upload: teemu-kurppa

Post on 16-Apr-2017

1.595 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Managing data workflows with Luigi

1

by Teemu Kurppa www.teemukurppa.net

Metrics Monday at Custobar, Helsinki, 30.5.2016

Managing data workflows with Luigi

Page 2: Managing data workflows with Luigi

2

Customer analytics and marketing tool for retailers

I’m an advisor at your host:

Page 3: Managing data workflows with Luigi

[email protected]

www.ouraring.com

the world's first wellness ring

Head of Software: Cloud & Mobile

I work at

Page 4: Managing data workflows with Luigi

Introducing Data Workflows

4

Page 5: Managing data workflows with Luigi

gunzip -c /var/log/syslog.3.gz | grep -e UFW

Page 6: Managing data workflows with Luigi

Complex data workflowLet’s analyse if the weather affects sleep quality: • Get sleep data of all study participants • Get location data of all study participants • Fetch weather data for each day and location • Fetch historical weather data for each location • Calculate difference from an average weather for each

data point • Do a statistical analysis over users and days, comparing

weather data and sleep quality data

A lot of can go wrong on each step. Rerunning takes time

Page 7: Managing data workflows with Luigi

Case Custobar:ETL

Extract - Transform - Load

Page 8: Managing data workflows with Luigi

Case Custobar:ETL

Fetch custom sales.csv

from SFTPTransform

custom sales.csv to standard sales.json

Validate and throw away invalid

fields

Load valid sales data to database

Page 9: Managing data workflows with Luigi

Case Custobar:ETL

Fetch custom sales.csv

from SFTPTransform

custom sales.csv to standard sales.json

Validate and throw away invalid

fields

Load valid sales data to database

Transform

Load

Extract

Page 10: Managing data workflows with Luigi

Case Custobar:ETL

Fetch custom sales.csv

from SFTPTransform

custom sales.csv to standard sales.json

Validate and throw away invalid

fields

Load valid sales data to database

Do this, for millions of rows of initial data, and continue doing it every day, for products customers sales

Page 11: Managing data workflows with Luigi

Luigi by Spotify

11

Page 12: Managing data workflows with Luigi

Data workflow tools

Pinball by Pinterest

Luigi by Spotify

Airflow by AirBnB

Page 13: Managing data workflows with Luigi

Luigi Concepts

13

Page 14: Managing data workflows with Luigi

Luigi Concepts

Get Changed Customers

sql: Customers table

Tasks

Targets

Export Changed Customers to FTP

file://data/customers.csv

sftp://data/customers.csv

Dependencies

Page 15: Managing data workflows with Luigi

Luigi Concepts

Get Changed Customers

sql: Customers table

Tasks

Targets

Export Changed Customers to FTP

file://data/customers.csv

sftp://data/customers.csv

Dependencies

output()input() input() output()

requires()

Page 16: Managing data workflows with Luigi

Luigi Concepts

Get Changed Customers

sql: Customers table

Tasks

Targets

Export Changed Customers to FTP

file://data/customers.csv

sftp://data/customers.csv

Dependencies

company: Parameter date: DateParameter

company: Parameter date: DateParameter

Parameters

Page 17: Managing data workflows with Luigi

Concepts: Target

17

Page 18: Managing data workflows with Luigi

TargetTarget is simply something that exists or doesn’t exist

For example • a file in a local file system • a file in a remote file system • a file in an Amazon S3 bucket • a database row in a SQL database

Page 19: Managing data workflows with Luigi

Targetclass MongoTarget(Luigi.Target): def __init__(self, database, collection, predicate): self.client = MongoClient() self.database = database self.collection = collection self.predicate = predicate def exists(self): db = self.client[self.database] one = db[self.collection].find_one(self.predicate) return one is not None

Page 20: Managing data workflows with Luigi

TargetLots of ready-made targets in Luigi:

• local file • HDFS file • S3 key/value target • SSH remote target • SFTP remote target • SQL table row target • Amazon Redshift table row target • ElasticSearch target

Page 21: Managing data workflows with Luigi

Concepts: Task

21

Page 22: Managing data workflows with Luigi

Task: basic structureclass TransformDailySalesCSVtoJSON(Luigi.Task): def requires(self): #…

def run(self): # … def output(self): #…

Page 23: Managing data workflows with Luigi

Task: parametersclass TransformDailySalesCSVtoJSON(Luigi.Task):

date = luigi.DateParameter()

def requires(self): #… def run(self): # …

def output(self): #…

Page 24: Managing data workflows with Luigi

Task: requiresclass TransformDailySalesCSVtoJSON(Luigi.Task):

date = luigi.DateParameter()

def requires(self): return ImportDailyCSVFromSFTP(self.date)

def run(self): # … def output(self): #…

Page 25: Managing data workflows with Luigi

Task: outputclass TransformDailySalesCSVtoJSON(Luigi.Task):

date = luigi.DateParameter()

def requires(self): # …

def run(self): # …

def output(self): path = “/d/sales_%s.json” % (self.date.stftime(‘%Y%m%d’)) return luigi.LocalTarget(path)

Page 26: Managing data workflows with Luigi

Task: runclass TransformDailySalesCSVtoJSON(Luigi.Task):

date = luigi.DateParameter()

def requires(self): #…

def run(self): # Note: luigi’s input() and output() takes care of atomicity with self.input().open(‘r’) as infile:

data = transform_csv_to_dict(infile) with self.output().open(‘w’) as outfile:

json.dump(data, outfile)

def output(self): #…

Page 27: Managing data workflows with Luigi

Taskclass TransformDailySalesCSVtoJSON(Luigi.Task):

date = luigi.DateParameter()

def requires(self): return ImportDailyCSVFromSFTP(self.date)

def run(self): with self.input().open(‘r’) as infile:

data = transform_csv_to_dict(infile) with self.output().open(‘w’) as outfile:

json.dump(data, outfile)

def output(self): path = “/d/sales_%s.json” % (self.date.stftime(‘%Y%m%d’)) return luigi.LocalTarget(path)

Page 28: Managing data workflows with Luigi

TasksLots of ready-made tasks in Luigi:

• dump data to SQL table • copy to Redshift Table • run Hadoop job • query SalesForce • copy to Redshift Table • Load ElasticSearch index • …

Page 29: Managing data workflows with Luigi

Dependency patterns

29

Page 30: Managing data workflows with Luigi

Multiple dependencies

class TransformAllSales(Luigi.Task): def requires(self):

for i in range(1000): return [ImportInitialSaleFile(index=i)]

def run(self): #…

def output(self): #…

Page 31: Managing data workflows with Luigi

Dynamic dependencies

class LoadDailyAPIData(Luigi.Task): date = luigi.DateParameter()

def run(self): for filepath in os.listdir(‘/d/api_data/*.json’):

TransformDailyAPIData(filepath)

Page 32: Managing data workflows with Luigi

Wrapper task

class LoadAllDailyData(Luigi.WrapperTask): date = luigi.DateParameter()

def run(self): yield LoadDailyProducts(self.date) yield LoadDailyCustomers(self.date) yield LoadDailySales(self.date)

Page 33: Managing data workflows with Luigi

Why to use data workflow tools?

33

Page 34: Managing data workflows with Luigi

34

1. Resume the data workflow after a failure 2. Parametrize and rerun tasks every day 3. Organise code with shared patterns

Page 35: Managing data workflows with Luigi

35

Thanks! Questions?

Custobar is hiring! Approach Juha, Tatu or me to learn more

Follow @teemu on Twitter to stay in touch.