pycascading for intuitive flow processing with hadoop (gabor szabo)

60
PyCascading for Intuitive Flow Processing With Hadoop Gabor Szabo Senior Data Scientist Twitter, Inc.

Upload: pydata

Post on 02-Jul-2015

1.197 views

Category:

Technology


1 download

DESCRIPTION

Slides for a talk given by Gabor Szabo at PyData Silicon Valley 2013

TRANSCRIPT

Page 1: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

PyCascading for Intuitive Flow Processing With

HadoopGabor Szabo

Senior Data ScientistTwitter, Inc.

Page 2: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Outline• Basic concepts in the Hadoop ecosystem, with an example

• Hadoop• Cascading• PyCascading

• Essential PyCascading operations• PyCascading by example: discovering main interests among friends• Miscellaneous remarks, caveats

2

Page 3: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

HadoopArchitecture• The Hadoop file system (HDFS)

• Large, distributed file system• Thousands of nodes, PBs of data• The storage layer for Apache Hive, HBase, ...

• MapReduce• Idea: ship the code to the data, not other way around• Do aggregations locally• Iterate on the results• Map phase: process the input records, emit a key & a value• Reduce phase: collect records with the same key from Map, emit a new (aggregate) record

• Fault tolerance• Both storage and compute are fault tolerant (redundancy, replication, restart)

3

Page 4: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

HadoopIn practice• Language

• Java

• Need to think in MapReduce• Hard to translate the problem to MR• Hard to maintain and make changes in the topology

• Best used for• Archiving (HDFS)• Batch processing (MR)

4

Page 5: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

CascadingThe Cascading way: flow processing• Cascading is built on top of Hadoop• Introduces semi-structured flow processing of tuples with typed fields• Analogy: data is flowing in pipes

• Input comes from source taps• Output goes to sink taps• Data is reshaped in the pipes by different operations

• Builds a DAG from the job, and optimizes the topology to minimize the number of MapReduce phases

• The pipes analogy is more intuitive to use than raw MapReduce

5

Page 6: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Page 7: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Page 8: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Sink tap

Page 9: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Filter

Sink tap

Page 10: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Filter

Sink tap

“Map”

Page 11: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Filter

Sink tap

“Map” Join

Page 12: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Flow processing in (Py)Cascading

6

Source: cascading.org

Source tap

Filter Group & aggregate

Sink tap

“Map” Join

Page 13: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

PyCascadingDesign• Built on top of Cascading• Uses the Jython 2.5 interpreter• Everything in Python

• Building the pipelines• User-defined functions that operate on data

• Completely hides Java if the user wants it to• However due to the strong ties with Java, it’s worth knowing the Cascading classes

7

Page 14: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Example: as always, WordCountWriting MapReduce jobs by hand is hard• WordCount: split the input file into words, and count how many times each word occurs

8

Page 15: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Example: as always, WordCountWriting MapReduce jobs by hand is hard• WordCount: split the input file into words, and count how many times each word occurs

8

M

Page 16: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Example: as always, WordCountWriting MapReduce jobs by hand is hard• WordCount: split the input file into words, and count how many times each word occurs

8

MR

Page 17: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Example: as always, WordCountWriting MapReduce jobs by hand is hard• WordCount: split the input file into words, and count how many times each word occurs

8

MR

Support codeSupport code

Page 18: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Cascading WordCount• Still in Java, but algorithm design is easier• Need to write separate classes for each user-defined operation

9

Page 19: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Cascading WordCount• Still in Java, but algorithm design is easier• Need to write separate classes for each user-defined operation

9

M

Page 20: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Cascading WordCount• Still in Java, but algorithm design is easier• Need to write separate classes for each user-defined operation

9

MG

Page 21: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Cascading WordCount• Still in Java, but algorithm design is easier• Need to write separate classes for each user-defined operation

9

MG

Support codeSupport code

Page 22: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

word_count.pyPyCascading minimizes programmer effort

10

Page 23: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

word_count.pyPyCascading minimizes programmer effort

10

Map

Page 24: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

word_count.pyPyCascading minimizes programmer effort

10

GM

ap

Page 25: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

word_count.pyPyCascading minimizes programmer effort

10

GSupport code

Map

Page 26: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

PyCascading workflowThe basics of writing a Cascading flow in Python• There is one main script that must contain a main() function

• We build the pipeline in main()• Pipes are joined together with the pipe operator |

• Pipe ends may be assigned to variables and reused (split)

• All the user-defined operations are Python functions• Globally or locally-scoped

• Then submit the pipeline to be run to PyCascading• The main Python script will be executed on each of the workers when they spin up to

import global declarations• This is the reason we have to have main(), so that it won’t be executed again

11

Page 27: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

PyCascading by exampleWalk through the operations using an example• Data

• A friendship network in long format• List of interests per user, ordered by decreasing importance

• Question• For every user, find which main interest among the friends occurs the most

• Workflow• Take the most important interest per user, and join it to the friendship table• For each user, count how many times each interest appears, and select the one with the

maximum count

12

Friendship network User interests

Page 28: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

The full source

13

Page 29: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Defining the inputs

14

Page 30: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Defining the inputs

14

Need to use Java types since this is a

Cascading call

Page 31: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Shaping the fields: “mapping”

15

Page 32: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Shaping the fields: “mapping”

15

Replace the interest field with the result yielded by take_first, and call it

interest

Page 33: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Shaping the fields: “mapping”

15

Decorators annotate user-defined functions

Replace the interest field with the result yielded by take_first, and call it

interest

Page 34: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Shaping the fields: “mapping”

15

Decorators annotate user-defined functions

tuple is a Cascading record type

Replace the interest field with the result yielded by take_first, and call it

interest

Page 35: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Shaping the fields: “mapping”

15

Decorators annotate user-defined functions

tuple is a Cascading record type

We can return any number of new

records with yield

Replace the interest field with the result yielded by take_first, and call it

interest

Page 36: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Checkpointing

16

Page 37: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Checkpointing

16

Take the data EITHER from the cache (ID: “users_first_interests”), OR generate it if it’s not cached yet

Page 38: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Grouping & aggregating

17

Page 39: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Grouping & aggregating

17

Group by user, and call the two result fields

user and friend

Page 40: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Grouping & aggregating

17

Define a UDF that takes the the grouping fields, a tuple iterator, and

optional arguments

Group by user, and call the two result fields

user and friend

Page 41: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Grouping & aggregating

17

Define a UDF that takes the the grouping fields, a tuple iterator, and

optional argumentsUse the .get getter with the field name

Group by user, and call the two result fields

user and friend

Page 42: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Grouping & aggregating

17

Define a UDF that takes the the grouping fields, a tuple iterator, and

optional argumentsUse the .get getter with the field name

Yield any number of results

Group by user, and call the two result fields

user and friend

Page 43: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

Page 44: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

Page 45: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Page 46: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Page 47: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Page 48: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Page 49: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Page 50: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Use built-in aggregators

where possible

Page 51: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Use built-in aggregators

where possible

Page 52: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Use built-in aggregators

where possible

Page 53: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Joins & field algebra

18

No field name overlap is allowed

Keep certain fields & rename

Join on the friend field from the 1st stream, and on the

user field from the 2nd

Save this stream!

Use built-in aggregators

where possible

Page 54: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Split & aggregate more

19

Page 55: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Split & aggregate more

19

Split the stream to group by user, and

find the interest that appears most

by count

Page 56: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Split & aggregate more

19

Split the stream to group by user, and

find the interest that appears most

by countOnce the data flow is

built, submit and run it!

Page 57: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Running the scriptLocal or remote runs• Cascading flows can run locally or on HDFS• Local run for testing• local_run.sh recommendation.py

• Remote run in production• remote_deploy.sh -s server.com \ recommendation.py

• The example had 5 MR stages• Although the problem was simple, doing it by hand would

have been inconvenient

20

Friendship network User interests

friends_interests_counts

recommendations

Page 58: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Some remarks• Benefits

• Can use any Java class• Can be mixed with Java code• Can use Python libraries

• Caveats• Only pure Python code can be used, no compiled C (numpy, scipy)• But with streaming it’s possible to execute a CPython interpreter• Some idiosyncrasies because of Jython’s representation of basic types

• Strings are OK, but Python integers are represented as java.math.BigInteger, so before yielding explicit conversion is needed (joins!)

21

Page 59: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Contact• Javadoc: http://cascading.org• Other Cascading-based wrappers

• Scalding (Scala), Cascalog (Clojure), Cascading-JRuby (Ruby)

22

http://github.org/twitter/pycascadinghttp://pycascading.org

@[email protected]

Page 60: PyCascading for Intuitive Flow Processing with Hadoop (gabor szabo)

Implementation detailsChallenges due to an interpreted language• We need to make code available on all workers

• Java bytecode is easy, same .jar everywhere

• Although Jython represents Python functions as classes, they cannot be serialized• We need to start an interpreter on every worker

• The Python source of the UDFs is retrieved and shipped in the .jar

• Different Hadoop distributions explode the .jar differently, need to use the Hadoop distributed cache

23