Download - Intro to cascading
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
1
phot
o by
: i_p
inz,
flic
kr
Scale Unlimited
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Welcome to Intro to Cascading
This class is an introduction to CascadingTopics covered
The Why, What and How of Cascading
Nuts and Bolts of Writing Flows
Built-in Operations
Grouping and Joining
Taps and Schemes
2
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Meet Your Instructor
Ken Krugler - direct from Nevada City, CaliforniaPresident of Scale UnlimitedApache Software Foundation MemberHadoop/Cascading/Mahout/Cassandra developer and trainerSolr developer and trainer
3
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
I Assume You...
are reasonably proficient at Javahave a basic understanding of Hadoopdon’t like wasting time writing map-reduce jobs :)
4
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Agenda - Morning
OverviewCascading BenefitsThinking in Cascading
Pipes, Tuples & FieldsLog Processing LabLab Review
5
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Agenda - Afternoon
Taps & SchemesFunctions & FiltersGrouping & JoiningComplex Workflow
Cascading Local mode Second LabTest-driven DevelopmentSummary
6
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
7
Overview
phot
o by
: exf
ordy
, flic
kr
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
What is one of the advantages of Cascading over Hive/Pig?What is the fundamental unit of data in Cascading?
8
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
What is Cascading?
A thin Java library on top of HadoopAn open source project
First released January 2008
Apache Public License
On revision 2.5.0
An API for defining and running data processing workflows
9
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Why Use Cascading?
Hadoop map-reduce APIs are too low levelUnnatural to think in terms of key-value pairs, reducing by key
Hard to understand/validate complex workflows
Challenging to implement complex joining of data streams
10
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Why Use Cascading?
Hive can be too high level, depending on the jobLoss of control over data in/out
Hard to integrate with other Java processes
Challenging to add lots of custom operations
Difficult to provide precise error handling
Pig has similar issuesIf you want a data processing DSL, look at Scalding or Cascalog
11
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)
12
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)You do Operations on Tuples
13
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)You do Operations on TuplesPipes connect Operations
14
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)You do Operations on TuplesPipes connect OperationsTuples flow into Pipes from Source Taps
15
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)You do Operations on TuplesPipes connect OperationsTuples flow into Pipes from Source TapsTuples flow from Pipes into Sink Taps
16
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cascading from 30,000ft
Data is processed as records (Tuples)You do Operations on TuplesPipes connect OperationsTuples flow into Pipes from Source TapsTuples flow from Pipes into Sink Taps This is a data processing workflow (Flow)
17
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
What is one of the advantages of Cascading over Hive/Pig?What is the fundamental unit of data in Cascading?
18
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
19
Cascading Benefits
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
Why is it easy to parse text with Cascading?How does Cascading help you create multi-job workflows?
20
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Flow Validation
Pipes often specify Fields that they need (for selection, grouping)Operations always specify Fields for the Tuples they emitSame for Source Taps (what they “pour” into their Pipes)Same for Sink Taps (what they consume)
So Cascading can validate Fields flowing through the workflow Helps avoid Dreaded Logic Errors in workflow design
21
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Built-in Operators
Over 75 handy operatorsAssertions, Filtering, Logical operatorsRegular expression-based parsingMany workflows don’t need any custom operators
22
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Complex Joins
Merging two or more streams of dataGrouping/sorting recordsJoining records with different fieldsAvoids the pain of custom WritableComparable classes
23
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Data Interfaces
Sources & Sinks support many data formatsHadoop standard - Text, SequenceFiles
Custom - Cassandra, HBase, Avro, Solr, JDBC, etc.
Possible to add your own
24
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Job Planner
Handles converting workflow into one or more Hadoop JobsAutomatically creates/cleans up intermediate data
Jobs must use HDFS files as “bridge”
Automatically runs Jobs in proper sequence, and/or in parallelAuto-magically optimizes the workflow
Multiple map-side operations are part of one Hadoop Job
One or more map operations can follow a group, also in one Job
25
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Complex Flow
26
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Job Scheduler
Multiple workflows can be combined (a “Cascade”)Scheduler handles figuring out ordering of workflows
Jobs run in parallel if not dependent on each other
And only running a workflow if its input data is “stale”Based on modified time of input files
27
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
Why is it easy to parse text with Cascading?How does Cascading help you create multi-job workflows?
28
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
29
Thinking in Cascading
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
What do you use to join two different types of data?What do you use to sort data in a single stream?
30
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
A Typical Data Problem
I have server log data with IP addresses and HTTP status codes173.255.195.185 - - [05/Sep/2011:06:03:56 -0600] "GET /feed/ HTTP/1.1" 304 166 "-" "Superfeedr”
I also have a data table that maps from IP addresses to countries87.250.253.242 RUSSIAN FEDERATION
I want to merge the two, and count country and status code combosRUSSIAN FEDERATION 200 27
31
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Think about the operations
I’ll need to parse the input lines of text, to generate records
32
Pipe ipDataPipe = new Pipe("ip data pipe");RegexParser ipDataParser = new RegexParser(new Fields("Data IP", "Country"), ^([\\d\\.]+)\t(.*));ipDataPipe = new Each(ipDataPipe, new Fields("line"), ipDataParser);
Parse to extract IP address and status
Read the access log data
'offset', 'line'
'Log IP', 'Status'
Parse to extract IP address and country
Read the IP-to-country data
'offset', 'line'
'Data IP', 'Country'
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Fields, Tuples and Pipes
Fields - a list of names, like the header line in a CSV fileProvides a way to map from a name to a position (column #)
Fields f = new Fields(“name”, “age”);
Tuple - a list of values, like one row in a CSV fileA general container for data, similar to a record
Tuple t = new Tuple(“Ken Krugler”, 37);
Pipe - a ‘stream’ of TuplesPipes connect operations and data inputs/outputs
Pipe p = new Pipe(“people”);
33
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Functions & Each
Function - an Operation that converts Tuples‘RegexParser’ is a built-in Function
Each - a mapper that applies a function or filter to every TupleTakes a Pipe + Operation as input, returns a Pipe
Pipe newPipe = new Each(p, new Function());
Thus you build your workflow from the top down
34
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Next comes joining
I’ll need to join the two Tuple streams, using the IP address
35
Pipe logAnalysisPipe = new CoGroup( logDataPipe, // left-side pipe new Fields("Log IP"), // left-side field for joining ipDataPipe, // right-side pipe new Fields("Data IP"), // right-side field for joining new LeftJoin()); // type of join to do
Join Tuple streams using 'Log IP' and 'Data IP'
'Data IP', 'Country''Log IP', 'Status'
'Log IP', 'Status', 'Data IP', 'Country'
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
CoGroup
A ‘trigger’ for a reduce phase Defines what fields are used as the keyAnd what Tuple streams are being groupedUsed for joining multiple dissimilar Tuple streamsTakes two or more pipes as input, plus grouping fieldsPipe p = new CoGroup(leftPipe, new Fields(“email name”), rightPipe, new Fields(“db name”));
36
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Count occurrences of groups
I’ll need to group by what I’m counting, and sum # of occurrences
37
logAnalysisPipe = new GroupBy(logAnalysisPipe, new Fields("Country", "Status"));logAnalysisPipe = new Every(logAnalysisPipe, new Count(new Fields("Count")));
Count occurrences for each unique group
Group by country and status
'Country', 'Status'
'Country', 'Status', 'Count'
'Log IP', 'Status', 'Data IP', 'Country'
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
GroupBy
A ‘trigger’ for a reduce phaseDefines what fields are used as the keyAnd what Tuple stream or streams are being groupedUsed for grouping/sorting one Tuple streamTake one or more pipes as input, plus grouping fieldsPipe newPipe = new GroupBy(p, new Fields(“age”));
38
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Aggregators & Every
Aggregator - an Operation that processes a group of dataTypically does arithmetic operations like counting and summing
‘Count’ is a built-in Aggregator
Every - a reducer that applies an Aggregator to every Tuple in a groupTakes a Pipe + Aggregator or Buffer as input, returns a Pipe
Must follow a CoGroup or GroupBy
Pipe newPipe = new Every(p, new Count());
39
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Finally filter and write out resultsSome entries wind up with ‘null’ for the country
That’s what left join will do if there’s no match from the right side
40
logAnalysisPipe = new Each(logAnalysisPipe, new Fields("country"), new Not(new RegexFilter("null")));
Skip Tuples where 'Country' is null
Write the resulting output file
'Country', 'Status', 'Count'
'Country', 'Status', 'Count'
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Filters
Filter - an Operation that conditionally skips Tuples‘Not’ and ‘RegexFilter’ are built-in Filters
‘Debug’ is a very handy Filter - removes nothing, logs Tuple values
41
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Putting it all together
42
Each('log data pipe*ip data pipe')[FilterNullValues[decl:'country', 'status', 'count']]
Hfs['TextLine[['offset', 'line']->[ALL]]']['build/test/']']
Every('log data pipe*ip data pipe')[Count[decl:'count']]
GroupBy('log data pipe*ip data pipe')[by:['country', 'status']]
CoGroup('log data pipe*ip data pipe')[by:ip data pipe:['ip']log data pipe:['logip']]
Each('ip data pipe')[RegexParser[decl:'ip', 'country'][args:1]]
Hfs['TextLine[['offset', 'line']->[ALL]]']['src/test/resources/ip-map.tsv']']
Each('log data pipe')[RegexParser[decl:'logip', 'status'][args:1]]
Hfs['TextLine[['offset', 'line']->[ALL]]']['src/test/resources/access.log']']
TempHfs['SequenceFile[['logip', 'status', 'ip', 'country']]'][log_data_pipe_ip_data_pip/20824/]
[{3}:'country', 'status', 'count']
[{2}:'ip', 'country'][{2}:'logip', 'status']
[{4}:'logip', 'status', 'ip', 'country']
[{3}:'country', 'status', 'count']
[{4}:'logip', 'status', 'ip', 'country']
[{4}:'logip', 'status', 'ip', 'country']
[{2}:'offset', 'line'] [{2}:'offset', 'line']]
File read or write
Map operation
Reduce operation
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
The Basic ConceptBuild a workflow ‘graph’
Operations connected by Pipes
Turn the graph into something runnableConnect input Pipes to Source Taps
Connect output Pipes to Sink Taps
43
Tap logDataTap = new Hfs(new TextLine(), "access.log");Tap ipDataTap = new Hfs(new TextLine(), "ip-map.tsv");Tap outputTap = new Hfs(new TextLine(), "results");
FlowDef flowDef = new FlowDef().setName("log analysis flow") .addSource(logDataPipe, logDataTap).addSource(ipDataPipe, ipDataTap) .addTailSink(logAnalysisPipe, outputTap);
Flow flow = new HadoopFlowConnector(properties).connect(flowDef);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Taps & SchemesTaps represent resources such as files in HDFS
can be read from (“Source Tap”)
can be written to (“Sink Tap”)
Many are both (e.g. Hfs is a general Tap for files)
Schemes represent the format of the data in the resourceHow to read and write Tuples
TextLine and SequenceFile are most common
44
Tap input = new Hfs(new TextLine(new Fields("offset", "line")), options.getInputFile());Tap output = new Hfs(new TextLine(new Fields("ip", "count")), options.getOuputDir(), SinkMode.REPLACE);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
FlowsWhat you get when Pipes are “bound” to Source & Sink TapsDefines a complete workflow
Read data from source(s)
Process the data
Write data to sink(s)
Created via FlowConnector.connect() callExecuted via flow.complete()
45
FlowDef flowDef = new FlowDef().setName("analysis flow") .addSource(inputPipe, sourceTap) .addTailSink(outputPipe, sinkTap);
Flow flow = new HadoopFlowConnector(properties).connect(flowDef);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
What do you use to join two different types of data?What do you use to sort data in a single stream?
46
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
47
Pipes, Tuples & Fields
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
Why do Pipes contain Tuples and not TupleEntries?Why do Objects in Tuples need to be serializable?
48
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Pipe Redux
A Pipe connects “things” in a FlowSource Tap
Operation (Each)
Grouping (GroupBy, CoGroup)
Group Processing (Every)
Sink Tap
A Pipe has a nameA Pipe is always bound (head & tail) in a Flow
49
Every [Count]
GroupBy
Each [RegexParser]
Source Tap
Sink Tap
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Pipe Splicing
Three common actions with PipesSplitting - Dividing one Pipe into two
Merging - Combining similar Pipes
Joining - Joining different Pipes
50
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Pipe Splitting
When one stream of Tuples needs to be processed multiple timesDefining a new Pipe from another Pipe creates a split
51
Pipe wordsPipe = new Pipe("words"); Pipe bigramsPipe = new Pipe("bigrams", wordsPipe);
wordsPipe = new Each(wordsPipe, new MyWordFilter());
"words"
"words" "bigrams"
Each [MyWordFilter]
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Split Position
Changing when the “split” Pipe is added changes the resultsWhat will this code generate?
52
Pipe wordsPipe = new Pipe("words");
wordsPipe = new Each(wordsPipe, new MyWordFilter());
Pipe bigramsPipe = new Pipe("bigrams", wordsPipe);
"words"
"words" "bigrams"
Each [MyWordFilter]
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Pipe Merging
When one or more Pipes need to be combinedOnly works when all Pipes have Tuples with the same FieldsDoes the merging without a reduce phase
53
Pipe leftPipe = new Pipe("left");Pipe rightPipe = new Pipe("right", leftPipe); Pipe comboPipe = new Merge("combo", leftPipe, rightPipe);
"left"
"left" "right"
Merge
"combo"
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Pipe Joining
When one or more Pipes need to be joinedNeeded when Pipes have Tuples with different FieldsTriggers a reduce in Hadoop
54
Pipe logAnalysisPipe = new CoGroup( logDataPipe, // left-side pipe new Fields("Log IP"), // left-side field for joining ipDataPipe, // right-side pipe new Fields("Data IP"), // right-side field for joining new LeftJoin()); // type of join to do
Join Tuple streams using 'Log IP' and 'Data IP'
'Data IP', 'Country''Log IP', 'Status'
'Log IP', 'Status', 'Data IP', 'Country'
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Tuples in Pipes
Logically a Pipe is a stream of Tuples, all with the same fieldsA Tuple is an array of ObjectsEach Object in the array has a position (0...n-1)Objects can be nullObjects must be serializable
Cascading “native”
Hadoop writeable
Custom serialization
55
[ 'Ken Krugler', 51, 39.2613, -121.0186 ]
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Fields
Fields map from a field name to a positionOr for selecting values from a TupleOr as special “Field Algebra” values
56
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Field Names
Fields map from a field name to a positionAlmost always String valuese.g. ‘name’ is position 0
57
Fields person = new Fields("name", "age", "lat", "lon");
[ 'Ken Krugler', 51, 39.2613, -121.0186 ]
[ 'name', 'age', 'lat', 'lon' ]
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Field Selectors
Fields map from a field name to a positionAlmost always String valuese.g. ‘name’ is position 0
58
Fields person = new Fields("name", "age", "lat", "lon");Tuple t = new Tuple("Ken Krugler", 37, 39.2613, -121.0186);int age = t.getInteger(person.getPos("age"));
Tuple x = t.get(person, new Fields("lat", "lon"));
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Field Algebra
Typically used to define what an Each or Every Pipe will emitOne common value is Fields.ALL
Combination of Fields in Pipe, and Operation output
59
Fields testFields = new Fields("name", "age");
// CalcRisk is a Function that outputs a single "risk" fieldpipe = new Each(pipe, new Fields("age"), new CalcRisk(), Fields.ALL);
['name', 'age', 'risk']['Ken Krugler', '51', 'high']['Joe Junior', '25', 'low']
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Field Algebra Fields.RESULTS gives just the Operation results
60
pipe = new Each(pipe, new Fields("age"), new CalcRisk(), Fields.RESULTS);['risk']['high']['low']
Fields.SWAP replace Operation arguments with its resultspipe = new Each(pipe, new Fields("age"), new CalcRisk(), Fields.SWAP);
['name', 'risk']['Ken Krugler', 'high']['Joe Junior', 'low']
Several values only make sense after CoGroup/GroupByFields.GROUP, Fields.VALUES
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
TupleEntry
Combination of a Tuple and a FieldProvided to Operations by CascadingUseful for named access to Tuple values
61
Fields fields = new Fields("name", "age", "lat", "lon");Tuple t = new Tuple("Ken Krugler", 51, 39.2613, -121.0186);TupleEntry te = new TupleEntry(fields, t);
int age = te.getInteger("age");
[ 'Ken Krugler', 51, 39.2613, -121.0186 ]
[ 'name', 'age', 'lat', 'lon' ]
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
Why do Pipes contain Tuples and not TupleEntries?Why do Objects in Tuples need to be serializable?
62
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
63
Log Processing Lab
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
Did you get the LogAnalysisTool to build and run?Were you able to extend it as per the exercises?What was challenging about the exercises?
64
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
LogAnalysisTool
Simple workflow to process Apache-format web server log filesFound in com.scaleunlimited.labs.loganalysis packageGenerates a report of number of requests per country x result
e.g. from Brazil, we have 37 results with status = 200 (OK)
Brazil<tab>200<tab>37
Demonstrates parsing input data, grouping, and joining
65
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Coding Conventions
We use args4j to simplify command line parameter parsingTypically three source files per workflow:
xxxOptions - specifies parameters with getters/setters for args4j
xxxTool - parses command line, gets Flow from xxxWorkflow & runs it
xxxWorkflow - uses passed options to build appropriate Flow
66
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Environment
We use Ant to build the code & run testsbuild.xml has definitions of build targets for Ant
build.properties defines values used by build.xml
Lab uses Cascading “local” platform versus HadoopYou can use Eclipse to build code & run tests
% ant eclipse will create Eclipse project files
More details in the lab README-loganalysis file
67
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Eclipse Project
README-loganalysis shows how to set up project
After importing, open upproject to correct package
68
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
LogAnalysisWorkflow
69
Pipe logDataPipe = new Pipe("log data pipe");RegexParser logParser = new RegexParser(LOG_DATA_FIELDS, LOG_DATA_PATTERN);logDataPipe = new Each(logDataPipe, new Fields("line"), logParser); Pipe ipDataPipe = new Pipe("ip data pipe");RegexParser ipDataParser = new RegexParser(IP_DATA_FIELDS, IP_DATA_PATTERN);ipDataPipe = new Each(ipDataPipe, new Fields("line"), ipDataParser);
// Now we need to join the log data with the IP-to-country mapping data.Pipe logAnalysisPipe = new CoGroup( logDataPipe, // left-side pipe new Fields("logip"), // left-side field for joining ipDataPipe, // right-side pipe new Fields("ip"), // right-side field for joining new LeftJoin()); // type of join to do // Group by country and statuslogAnalysisPipe = new GroupBy(logAnalysisPipe, new Fields("country", "status"));logAnalysisPipe = new Every(logAnalysisPipe, new Count(new Fields("count")));
// Get rid of null valueslogAnalysisPipe = new Each(logAnalysisPipe, new FilterNullValues());
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Lab Exercises
Follow instructions found in loganalysis/README-loganalysisVerify you can build and run the toolExtend it as described in the README file aboveCascading Javadocs: http://docs.cascading.org/cascading/2.2/javadoc/Cascading User guide can also be usefulAsk questions, use solutions if you get stuck
70
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
Did you get the LogAnalysisTool to build and run?Were you able to extend it as per the exercises?What was challenging about the exercises?
71
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
72
Log Processing Code Review
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
How does Cascading know what to store in the Tuple?Why does using a GroupBy sort the output by count?
73
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Creating an output TupleHow to build a Tuple for output?
new Tuple(a, b, c, ...)
Cascading converts values to Objects
Order of values has to match declared outputsuper(new Fields("country", "status", "count"));
Up to you to match what you told Cascading
74
TupleEntry entry = functionCall.getArguments();if (entry.getString("country") == null) { //#### Solution 1 instead of filtering, emit Tuple with special value String status = entry.getString("status"); long count = entry.getLong("count"); Tuple unknownEntry = new Tuple("UNKNOWN", status, count); functionCall.getOutputCollector().add(unknownEntry);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Grouping to sortGroupBy triggers a reduce step
You specify the fields to use as the key (e.g. “count”)
Hadoop always sorts the keys
You can specify reverse sort order via optional boolean flag
75
logAnalysisPipe = new GroupBy(logAnalysisPipe, new Fields("count"), true);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
How does Cascading know what to store in the Tuple?Why does using a GroupBy sort the output by count?
76
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
77
Taps & Schemes
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
How do you connect multiple tail Pipes with multiple Sinks?How does a TextLine scheme output multiple fields?
78
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Common Hadoop Taps
Lfs - Local file systemLocal files only
Useful in unit tests
Hfs - Hadoop file systemGeneral purpose Tap
Can read locally or HDFS or S3, based on filesystem protocole.g. s3n://mybucket/ will read from Amazon's S3 file system
79
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Using Lfs for Testing
Create Source Tap that references a local fileWrite test data to that directoryCreate Source Tap (or regular File, for text) that references resultsRead/validate output
80
final Fields testFields = new Fields("user", "value"); String in = "build/test/Aggregator/testAggregatorOutput/in";Lfs sourceTap = new Lfs(new SequenceFile(testFields), in, SinkMode.REPLACE);TupleEntryCollector write = sourceTap.openForWrite(new HadoopFlowSession());write.add(new Tuple("user1", 1));write.add(new Tuple("user2", 10));write.close();
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Local Taps
FileTap is the only Cascading local mode TapReads single filesUse DirectoryTap (in cascading.utils) for Lfs-equivalent
81
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Specifying Multiple Source Taps
Some Flows have multiple Source TapsMake multiple calls to FlowDef.addSource()
Give it the Pipe and the Tap
82
FlowDef flowDef = new FlowDef() .addSource(logDataPipe, logDataTap) .addSource(ipDataPipe, ipDataTap);
Flow flow = new HadoopFlowConnector().connect(flowDef);
CoGroup
Each
Source Tap #2
Each
Source Tap #1
"log pipe" "IP pipe"
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Specifying Multiple Sink Taps
Same thing for multiple Sink TapsBut mapping tail Pipe names to Sink Taps
Use FlowDef.addTailSink(tail Pipe, Tap)
83
FlowDef flowDef = new FlowDef() .addSource(headPipe, importSourceTap) .addTailSink(wordsPipe, wordsSinkTap). .addTailSink(bigramsPipe, bigramsSinkTap); Flow flow = new HadoopFlowConnector().connect(flowDef);
"words"
"words" "bigrams"
Each [MyWordFilter]
Sink Tap #1 Sink Tap #2
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Common Text Schemes
TextLineAs Source Tap returns “offset”, “line” as Fields by default
As Sink Tap writes out tab-separated values for fields
TextDelimitedAs SourceTap it parses delimited text files (CSV, TSV)
As Sink Tap writes out delimited text files (with and without headers)
Doesn’t handle all types of delimited formats
84
Tap speechSource = new Hfs(new TextDelimited(SpeechTable.FIELDS), inputPath);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
SequenceFile Scheme
Most efficient format for intermediate dataUses Hadoop SequenceFileKey is Tuple, Value is NullWritableMust specify Fields
85
Tap contentSource = new Hfs(new SequenceFile(FetchedDatum.FIELDS), inputPath);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Kryo Scheme
Equivalent to SequenceFile, but for Cascading local modeIn cascading.utils project
86
Tap contentSource = new DirectoryTap(new KryoScheme(FetchedDatum.FIELDS), inputPath);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Solr Scheme
Useful for converting workflow results into searchable index Only works as a Sink currently, no updatesAvailable via cascading.solr open source project
87
Scheme scheme = new SolrScheme(IndexData.FIELDS, options.getSolrConfDir());Tap indexSink = new Hfs(scheme, outputPath, true);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Avro SchemeAvro is open source data serialization frameworkStores schema with fileUseful as data interchange format with other codeResilient to schema changesAvailable via cascading.avro open source projectCan specify schema via fields/types or Avro JSON schema
88
public static final Fields FIELDS = new Fields(“url”, “content”);public static final Class[] TYPES = new Class[] {String.class, String.class};
Scheme scheme = new AvroScheme(FIELDS, TYPES);Tap avroSink = new Hfs(scheme, outputPath, true);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Database Taps
Often wind up being Tap/Scheme combosDatabases aren’t “resources” in same same sense as files
Can actually support SinkMode.UPDATE
Cassandra & HBase - NoSQL databasesJDBC - OK for simple/small databasesDynamoDB - Doesn’t exist yet, any takers?
89
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
How do you connect multiple tail Pipes with multiple Sinks?How does a TextLine scheme output multiple fields?
90
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
91
Functions & Filters
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
Why call super(<output Fields>) in a Function’s constructor?Why can’t a Filter return a random result?
92
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Each-style Operations
FunctionClassic Hadoop map function
Receives a Tuple, can output 0...n Tuples
Should declare result Fields in constructor, by calling super(fields)
FilterReceives a Tuple, returns true if it should be filtered (removed)
93
pipe = new Each(pipe, new SomeUsefulFunction());
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Built-in Functions
Many to choose fromEntire family of Regex<something> parsersIdentity is very handy for “Stream Shaping”
e.g. copy a field to a new field
Retain & Rename are wrappers for Identity
94
Fields argument = new Fields("original");Identity identity = new Identity(new Fields("copy"));
// Identity copies the incoming argument to the result tuple// Fields.ALL says to combine source fields and function output fieldspipe = new Each(pipe, argument, identity, Fields.ALL);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Custom Functions
Must extend BaseOperationMust implement FunctionShould call super(Fields) in constructor, to set output Fields
95
public class SomeFunction extends BaseOperation implements Function {
public void operate(FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry arguments = functionCall.getArguments();
// create a Tuple to hold our result values Tuple result = new Tuple("some result");
// TODO - insert some values into the result Tuple // return the result Tuple functionCall.getOutputCollector().add(result); }}
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Built-in Filters
Many to choose fromLimit for limiting total number of Tuples
Sample for selecting a random subset of Tuples
Debug for helping solve workflow problems
Prints out Tuple field values
Can print out field names, include a prefix
96
logAnalysisPipe = new Every(logAnalysisPipe, new Fields("country"), new Count(new Fields("count")));logAnalysisPipe = new Each(logAnalysisPipe, new Debug(true, "counted"));
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Custom Filters
Must extend BaseOperationMust implement FilterCannot be random
97
public class SomeFilter extends BaseOperation implements Filter {
@Override public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) { return false; }}
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
Why call super(<output Fields>) in a Function’s constructor?Why can’t a Filter return a random result?
98
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
99
Grouping & Joining
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
How can you fix field name collisions in a CoGroup? Which CoGroup Pipe (left or right) should have the biggest groups?
100
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Simple Grouping
GroupByFor grouping one or more Pipes that have same Fields
Useful for merging, re-ordering of Tuples by group/sort fields
Often followed by an Every()
101
Text
GroupBy(String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder)
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
JoiningCoGroup
For joining multiple Pipes that have different Fields
Must specify Fields to use for joining
Can use declaredFields to remap joined field names, to avoid collisions
102
Text
CoGroup(String groupName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner)
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Complex Joining
Possible to join three or more PipesMust specify joining field for each Pipe
First pipe is considered the “left” pipe, others are the “right” pipes
Don’t need declaredFields (null) if field names are unique
103
Text
Pipe updateUrlPipe = new CoGroup("Updating Crawl DB", Pipe.pipes(finishedUrlPipe, statusPipe, outlinkPipe), Fields.fields(new Fields("finished-url"), new Fields("status-url"), new Fields("outlink-url")), null, new OuterJoin());
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Different Types of Joins
Joining on “word” from Pipe A (left) with “animal” from Pipe B (right)
104
Text
Pipe A word count
Tuple 1
Tuple 2
fox 83
dog 27
Pipe B animal type
Tuple 1
Tuple 2
fox furry
turtle smooth
Outer word count animal typeTuple 1Tuple 2Tuple 3
fox 83 fox furrydog 27 null nullnull null turtle smooth
Inner word count animal typeTuple 1 fox 83 fox furry
Left word count animal typeTuple 1Tuple 2
fox 83 fox furrydog 27 null null
Right word count animal typeTuple 1Tuple 2
fox 83 fox furrynull null turtle smooth
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
CoGroup Joining Algorithm
Generate composite Tuples with Fields from both PipesGroup by specified Fields, sorting RHS Tuples before LHS TuplesFor each unique group,
Read all RHS Tuples into memory, if possible (spill to disk)
For each LHS Tuple, generate appropriate joined output using RHS Tuples
Handle cases of no LHS Tuples or no RHS Tuples
So it’s clear that the RHS Pipe should be the one with smaller groups
105
Text
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Every-style OperationsMust follow a GroupBy, CoGroup or HashJoinAggregator
Convenience class for specific type of reduce function
Receives a group key
Then gets a Tuple at a time, can update state
Typically outputs one result Tuple at end
BufferClassic Hadoop reduce function
Receives a group key plus an iterator for all Tuples in that group
106
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Built-in Aggregators
Mostly math functions - First, Last, Max, Min, SumCount is the most common one usedTypically used to insert new Field into Pipe Stream
107
Fields groupFields = new Fields("hour", "ip"); analysisPipe = new GroupBy(analysisPipe, groupFields);analysisPipe = new Every(analysisPipe, new Fields("ip"), new Count(new Fields("count")));
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Custom BuffersBuffer is a general purpose Interface, so no built-in implementationsMust extend BaseOperationMust implement Buffer
108
public class SomeBuffer extends BaseOperation implements Buffer { public void operate(FlowProcess flowProcess, BufferCall bufferCall) { TupleEntry group = bufferCall.getGroup(); String domain = group.getString("domain"); Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
int count = 0; while (arguments.hasNext()) { TupleEntry argument = arguments.next(); count += argument.getInteger("pages"); }
bufferCall.getOutputCollector().add(new Tuple(domain, count)); } }
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
How can you fix field name collisions in a CoGroup? Which CoGroup Pipe (left or right) should have the biggest groups?
109
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
110
Complex Workflow Example
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
What are the three phases of workflow design?Why does ImportAvroContent.isSafe() return false?
111
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Search System
Example of complex workflow to build typical search indexFound in augmentedsearch projectGenerates an index from web crawl dataAutomatically finds good labels to use for faceted search
112
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Design Process - So Simple
113
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Design Process
Output first Functionality
Schema
Input second Avro files
Workflow third How do we connect the dots?
114
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Output First - Functionality
Typical search Title
Content
Plus “auto clustering” Generate facet values automatically
115
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Input Second - Data
Avro filesApache project
Data serialization format
Result of web crawlSo we need to read using Avro Tap/AvroDatum
Then parse the data using Tika/ContentParser
116
// Use Tika to parse content//String url = inTupleEntry.getString(AvroDatum.REQUEST_URL_FN);BytesWritable content = (BytesWritable)(inTupleEntry.get(AvroDatum.CONTENT_FN));ContentParser.ParsedContent parsedContent = _contentParser.parse(content, url, contentType);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Workflow Last
Convert Avro files into fields we wantGenerate candidate phrases, do analysisOperations to create labelsMerge labels with other fields
117
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
ImportAvroContent in detail
Input is Tuple with “AvroDatum” fieldsParsing is expensive, so we only want to do that once
So return false from Function’s isSafe() method
Output is URL, title, content (text)
118
// Use Tika to parse contentString contentType = inTupleEntry.getString(AvroDatum.CONTENT_TYPE_FN);String url = inTupleEntry.getString(AvroDatum.REQUEST_URL_FN);BytesWritable content = (BytesWritable)(inTupleEntry.get(AvroDatum.CONTENT_FN));ParsedContent parsedContent = _contentParser.parse(content, url, contentType);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
High level workflow design
Convert Avro input files into Tuples with URL, title, contentGenerate phrases for each document, using 1 & 2-word combosFigure out “best” labels for each URLMerge URL/labels with URL/title/contentGenerate Solr index
119
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Generating Phrases
120
domain.com/page1 the quick fox
domain.com/page2 the slow dog
URL Content
IncomingTuples
domain.com/page1 the
domain.com/page1 the quick
domain.com/page1 quick
domain.com/page1 quick fox
domain.com/page1 fox
domain.com/page2 the
domain.com/page2 the slow
domain.com/page2 slow
domain.com/page2 slow dog
domain.com/page2 dog
EmittedTuples
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
How to find “best” labels?
Goldilocks problemNot too common, like “the” - don’t help with narrowing search results
Not too rare, like “Ken Krugler’s blog” - not likely to be in search results
Figure out right value for maximum document frequency (DF)Do analysis on sampled set of data, generating phrases
For each phrase, count # of documents that contain it
doc count/total docs = document frequency
121
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Grouping by Phrase, Counting
122
GroupByPhrase
domain.com/page1 the
domain.com/page1 the quick
domain.com/page1 quick
domain.com/page1 quick fox
domain.com/page1 fox
domain.com/page2 the
domain.com/page2 the slow
domain.com/page2 slow
domain.com/page2 slow dog
domain.com/page2 dog
the domain.com/page1
domain.com/page2
dog domain.com/page2
fox domain.com/page1
quick fox domain.com/page1
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Analysis Workflow
123
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Results of graphing samples
124
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Workflow for labels
Use saved URL/phrase data from first phaseGroup by phase
Count URLs, calculate inverse document frequency (IDF)
Filter out phrases with too low IDF, emit passing phrase/score
Join URL/phrase (flipped as phrase/URL) with phrase/scoreSo now we have URL/phrase/score
Group by URL, sort by phrase score (low to high)Emit URL/top phrases (best 10) as result
Finally join URL/top phrases with URL/title/content
125
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
BestPhrasesWorkflow in detail
Focus on createWorkflow(), where grouping and joining happensLet’s look at that code...
126
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Output Results as Solr index
Uses Cascading SolrSchemeMapping from fields in Tuples to fields in Solr scheme
127
public static final String URL_FN = "url";public static final String TITLE_FN = "title";public static final String LABELS_FN = "labels";public static final String CONTENT_FN = "content";public static final Fields OUT_FIELDS = new Fields(URL_FN, TITLE_FN, LABELS_FN, CONTENT_FN);
Scheme sinkScheme = new SolrScheme(BuildIndexDoc.OUT_FIELDS, options.getSolrHomeDir());Tap solrIndexSink = new Hfs(sinkScheme, solrIndexPath.toString(), SinkMode.REPLACE);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
What are the three phases of workflow design?Why does ImportAvroContent.isSafe() return false?
128
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
129
Cascading Local Platform
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
What’s the main reason to use Cascading’s local platform?
130
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
What is a Cascading Platform?
Data processing platform, not OS platformCascading was always designed to be platform-agnostic
Cascading 1.x only worked on Hadoop
Cascading 2.0 introduced a new Cascading Local PlatformNot to be confused with running Hadoop in “local mode”
Done to validate multi-platform architecture in Cascading
131
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
What is Platform-specific?
Many Schemes, some TapsPlanner used to define Flows
Converting from a “logical” flow to an execution plan
Flow execution logicReading/writing data, executing steps
Tests for validating proper operationUsed by cascading-platform-<version>.jar
132
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Why Use the Local Platform?Faster tests
90 minutes with Hadoop => 5 minutes with Local
No Hadoop dependenciesEasier to run on Windows
No issues with Hadoop config
Exposes platform dependencies in your codeNot much of an issue now, but in the future...
133
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Issues with Local Platform
Limited number of Taps & SchemesNo SequenceFile equivalent
Taps/Schemes don’t have same level of functionality as HadoopCan’t read compressed text files
TextDelimited scheme has fewer options
Only uses one thread/core for task executionSome obscure Tuple cloning and threading issues
http://www.scaleunlimited.com/2012/10/22/faster-tests-with-cascading-2-0-local-mode/
134
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
cascading.utils
Open source project we use in labshttps://github.com/ScaleUnlimited/cascading.utils
Provides BasePlatform & BasePath, with Hadoop & Local variantsAdds a SequenceFile-equivalent (KryoScheme)Makes it easier for code to be platform-independent
135
BasePlatform platform = makePlatform(testing);BasePath path = platform.makePath(targetDirname);
Scheme scheme = platform.makeBinaryScheme(new Fields("name", "age"));Tap tap = platform.makeTap(scheme, path);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
What’s the main reason to use Cascading’s local platform?
136
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
137
Advanced Log Processing Lab
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
“Advanced” functionality
Generate page access countsExtract page paths from access.log data
Group on path, count occurrences
Generate requests by IP addressMake this an additional output from the workflow
Generate bytes downloaded by IP addressExtract byte counts from access.log data
Group on IP, sum bytes
138
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Concept of “working dir”
Many complex workflows generate multiple outputsStandard pattern is that xxxOptions has a “working dir” parameterEach separate Sink builds its output path from this working directory
Append a fixed sub-dir name to the working directory path
e.g. working directory is “2013-01-20-stats”
one of many output directories would be “2013-01-20-stats/page-access/”
139
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Lab Exercises
Follow instructions found in superanalysis/README-superanalysisCode is in com.scaleunlimited.labs.superanalysis package
Ask questions if you get stuckUse the solutions if you get really stuckQ & A in 60 minutes
140
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
141
Advanced Log Processing Review
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Challenges
Adding multiple sinks to the workflowCalculating the sum of the downloaded bytes
142
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Adding Multiple Sinks
Need to connect multiple tail Pipes to multiple Sink TapsUse FlowDef.addTailSink(Pipe, Tap) to connect tail Pipe to Sink Tap
143
FlowDef flowDef = new FlowDef() .setName("log analysis flow") .addSource(logDataPipe, logData) .addSource(ipDataPipe, ipData) .addTailSink(pathCountPipe, pathCount) .addTailSink(ipAddressCountPipe, ipAddressCount);
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Counting Bytes
Need to use Sum Aggregator after GroupBy
144
Pipe ipAddressResponseBytesSumPipe = new Pipe("i/p address response bytes sum pipe", logAnalysisPipe);
ipAddressResponseBytesSumPipe = new GroupBy(ipAddressResponseBytesSumPipe, new Fields("logip"));ipAddressResponseBytesSumPipe = new Every( ipAddressResponseBytesSumPipe, new Fields("response_bytes"), new Sum(new Fields("response_bytes_count"), Integer.class));
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
145
Test-Driven Development
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Key Questions
What’s a common mistake when creating tests?What are the four types of tests you should have?
146
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Why testing workflows is hard
Lots of data means every edge case known to manComplex workflow means many conditions to validateDifferent execution environments
Cascading local platform
Hadoop local mode
Hadoop distributed
147
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Four Types of Tests
Unit testsWorkflow testsEnd-to-end testsCluster tests
148
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Unit Tests
Cascading support via cascading-core-<version>-tests.jarhttp://conjars.org/repo/cascading/cascading-core/2.2.1/cascading-core-2.2.1-tests.jar
Extend CascadingTestCaseUse invokeFunction, invokeXXXEasier than using Mockito/JMock
149
<dependencies> <dependency> <groupId>cascading</groupId> <artifactId>cascading-core</artifactId> <version>2.2.1</version> <classifier>tests</classifier> <scope>test</scope> </dependency> </dependencies>
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Example Unit Test
150
public class CalcRisk extends BaseOperation implements Function {
public CalcRisk() { super(new Fields("risk")); } public void operate(FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry arguments = functionCall.getArguments(); Tuple result = new Tuple(arguments.getInteger("age") < 50 ? "low" : "high"); functionCall.getOutputCollector().add(result); }}
public class MyFunctionsTest extends CascadingTestCase {
@Test public void testCalcRiskFunction() { TupleListCollector result = invokeFunction(new CalcRisk(), new Tuple("ken", 37), Fields.RESULTS); assertEquals("low", result.iterator().next().getString(0)); }}
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Workflow Tests
Hard to avoid “data mining” when coding up expectationsTypical error is to run test data in, save results, use those as expectations
Use hand-crafted data - often very tedious and painful
Limit combinatorial explosion of test cases
Don’t forget to include bad data
151
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Example Workflow Test
152
@Test public void testMyWorkflow() throws Exception { // Leverage InMemoryTap from cascading.utils InMemoryTap source = new InMemoryTap(new Fields("name", "age")); TupleEntryCollector writer = source.openForWrite(new LocalFlowProcess()); writer.add(new Tuple("ken", 37)); writer.add(new Tuple("bob", 55)); writer.close(); Pipe p = new Pipe("test"); p = new Each(p, new CalcRisk(), Fields.ALL); InMemoryTap sink = new InMemoryTap(new Fields("name", "age", "risk")); Flow f = new LocalFlowConnector().connect(source, sink, p); f.complete(); // We should have 2 resulting Tuples, each with 3 fields. validateLength(f, 2, 3);
Set<Tuple> results = asSet(f, sink); assertTrue(results.contains(new Tuple("ken", 37, "low"))); assertTrue(results.contains(new Tuple("bob", 55, "high"))); }
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
End-to-end Tests
Multiple workflows that are chained togetherTest “hand-off” between workflows
Even more painful to create real expectations
153
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Cluster Tests
Previous tests should be run using Cascading local platformYou need to run against Hadoop with multiple reducersAnd ideally with two types of data
Hand-crafted data, with tight expectations
Real-world data, with very loose expectations
Use PlatformTestCaseOr run an integration test against a real cluster
154
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Setting up for PlatformTestCaseInclude the cascading-hadoop-<version>-tests.jarAnd a bunch of missing jersey-xxx.jar files (Hadoop issue)Create a unit test class that extends PlatformTestCaseIn constructor, enable clustering with multiple reducers
Classic bug is not handling multiple output files from reduce phase
155
public class MyWorkflowTestCaseTest extends PlatformTestCase {
public MyWorkflowTestCaseTest() { // We want a cluster with 2 mappers & 2 reducers super(true, 2, 2); }
...}
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Example Platform Test
156
@Test public void testMyWorkflow() throws Exception { Fields sourceFields = new Fields("name", "age"); Tap sourceTap = getPlatform().getDelimitedFile(sourceFields, "testMyWorkflow/in", SinkMode.REPLACE); TupleEntryCollector writer = sourceTap.openForWrite(getPlatform().getFlowProcess()); writer.add(new Tuple("ken", 37)); writer.add(new Tuple("bob", 55)); writer.close(); Pipe p = new Pipe("test"); p = new Each(p, new CalcRisk(), Fields.ALL); Fields sinkFields = new Fields("name", "age", "risk"); Tap sinkTap = getPlatform().getDelimitedFile(sinkFields, "testMyWorkflow/out", SinkMode.REPLACE); Flow f = getPlatform().getFlowConnector().connect(sourceTap, sinkTap, p); f.complete(); // We should have 2 resulting Tuples, each with 3 fields. validateLength(f, 2, 3); Set<Tuple> results = asSet(f, sinkTap); assertTrue(results.contains(new Tuple("ken", "37", "low"))); assertTrue(results.contains(new Tuple("bob", "55", "high"))); }
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Q & A
What’s a common mistake when creating tests?What are the four types of tests you should have?
157
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Intro toCascading
158
Cascading Summary
phot
o by
: Gra
ham
Rac
her,
flick
r
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Good/bad Use Cases
Goodcomplex, stable workflows
integration with other Java-based systems
lots of custom operations
continuous data processing
Badad hoc analytics
if you know Clojure or Scala
159
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Thinking in Cascading
Sketch out workflow firstWhat are you trying to generate (output)?
What data are you processing (input)?
Then draw it as Pipes and Operations
Implement simple workflow nextWrite unit tests, especially for custom Operations
Minimal processing on input data
Validate input data assumptions
160
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Advanced Cascading
Much we haven’t covered in this introductory classOptimizations - CountBy, map-side joins, dropping fields
Debugging - controlling logging, monitoring flows
Subassemblies
Cascades
Error trapping
Etc, etc.
161
Copyright (c) 2011-2014 Scale Unlimited. All Rights Reserved. Reproduction or distribution of this document in any form without prior written permission is forbidden.
Resources
Cascading Documentation - http://www.cascading.org/documentation/User Guide
Javadoc
Examples
Cascading mailing lists - http://groups.google.com/group/cascading-userConjars Repository: http://conjars.org/
162