cascading introduction
DESCRIPTION
Cascading is a Data Processing API, Process Planner, and Process Scheduler used for defining and executing complex, scale-free, and fault tolerant data processing workflows on an Apache Hadoop cluster.TRANSCRIPT
Copyright 2010 TCloud Computing Inc.
CascadingAlex Su2011/02/11
Agenda
Trend Micro Confidential
• Introduction• How it works• Data Processing • Advanced Processing• Monitoring• Testing• Best Practices• Cascading GUI
Introduction
Trend Micro Confidential
• Hadoop coding is non-trivial• Hadoop is looking for a class to do Map steps and a
class to do Reduce step• What if you need multiple in your application?
Who coordinates what can be run in parallel?• What if you need to do non-Hadoop logic between
Hadoop steps?• Chain the Operations into data processing work-
flows
Introduction
Trend Micro Confidential
• Operations are chained together to define a Pipe assembly or a reusable sub-assembly
Introduction
Trend Micro Confidential
Pipe lhs = new Pipe( "lhs" );lhs = new Each( lhs, new SomeFunction() );lhs = new Each( lhs, new SomeFilter() );
// the "right hand side" assembly headPipe rhs = new Pipe( "rhs" );rhs = new Each( rhs, new SomeFunction() );
// joins the lhs and rhsPipe join = new CoGroup( lhs, rhs );join = new Every( join, new SomeAggregator() );join = new GroupBy( join );join = new Every( join, new SomeAggregator() );
// the tail of the assemblyjoin = new Each( join, new SomeFunction() );
Properties properties = new Properties();FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector( properties );Flow flow = flowConnector.connect( “join", source, sink, join);// execute the flow, block until completeflow.complete();
How it works
Trend Micro Confidential
• Pipe Assemblies become Flows• Translates a DAG of operations to a DAG of
MapReduce jobs• All MapReduce jobs in Flow scheduled in
dependency order
How it works
Trend Micro Confidential
digraph G { 1 [label = "Every('akamaiPipe*whiteListPipe')[Count[decl:'count']]"]; 2 [label = "Hfs['TextLine[['host', 'count']->[ALL]]']['/user/alex/output']']"]; 3 [label = "GroupBy('akamaiPipe*whiteListPipe')[by:['host']]"]; 4 [label = "Each('akamaiPipe*whiteListPipe')[NotMatchedFilter[decl:'host', 'offset', 'line']]"]; 5 [label = "CoGroup('akamaiPipe*whiteListPipe')[by:whiteListPipe:['line']akamaiPipe:['host']]"]; 6 [label = "Hfs['TextLine[['offset', 'line']->[ALL]]']['/user/alex/whitelist/whitelist.txt']']"]; 7 [label = "Each('akamaiPipe')[RegexParser[decl:'host'][args:1]]"]; 8 [label = "Hfs['TextLine[['line']->[ALL]]']['/user/alex/input/akamai.log']']"]; 9 [label = "[head]"]; 10 [label = "[tail]"]; 11 [label = "TempHfs['SequenceFile[['host', 'offset', 'line']]'][akamaiPipe_whiteListPipe/52729/]"]; 12 [label = "Hfs['TextLine[['offset', 'line']->[ALL]]']['/user/alex/trap']']"]; 1 -> 2 [label = "[{2}:'host', 'count']\n[{3}:'host', 'offset', 'line']"]; 7 -> 5 [label = "[{1}:'host']\n[{1}:'host']"]; 5 -> 4 [label = "whiteListPipe[{1}:'line'],akamaiPipe[{1}:'host']\n[{3}:'host', 'offset', 'line']"]; 3 -> 1 [label = "akamaiPipe*whiteListPipe[{1}:'host']\n[{3}:'host', 'offset', 'line']"]; 9 -> 6 [label = ""]; 9 -> 8 [label = ""]; 2 -> 10 [label = "[{?}:ALL]\n[{?}:ALL]"]; 4 -> 11 [label = "[{3}:'host', 'offset', 'line']\n[{3}:'host', 'offset', 'line']"]; 11 -> 3 [label = "[{3}:'host', 'offset', 'line']\n[{3}:'host', 'offset', 'line']"]; 8 -> 7 [label = "[{1}:'line']\n[{1}:'line']"]; 6 -> 5 [label = "[{2}:'offset', 'line']\n[{2}:'offset', 'line']"]; 7 -> 12 [label = ""];}
Data Processing
Trend Micro Confidential
• Tuple• A single ‘row’ of data being processed• Each column is named• Can access data by name or position
Data Processing
Trend Micro Confidential
• TAP• Abstraction on top of Hadoop files• Allows you to define own parser for files• Example:• Scheme
• TextLine• TextDelimited• SequenceFile• WritableSequenceFile
Hfs input = new Hfs(new TextLine(), a_hdfsDirectory + "/" + name);
Data Processing
Trend Micro Confidential
• Tap• LFS• DFS• HFS• MultiSourceTap• MultiSinkTap• TemplateTap• GlobHfs• S3fs(Deprecated)
Data Processing
Trend Micro Confidential
• TemplateTap
TemplateTap can be used to write tuple streams out to subdirectories based on the values in the Tuple instance.
Data Processing
Trend Micro Confidential
• TemplateTap
TextDelimited scheme = new TextDelimited( new Fields( "year", "month", "entry" ), "\t" );
Hfs tap = new Hfs( scheme, path );String template = "%s-%s"; // dirs named "year-month"Tap months = new TemplateTap( tap, template, SinkMode.REPLACE );
Data Processing
Trend Micro Confidential
• TAP types• SinkMode.KEEP• SinkMode.REPLACE• SinkMode.UPDATE
Data Processing
Trend Micro Confidential
• Integration• Cascading.Avro• Cascading.Hbase• Cascading.JDBC• Cascading.Memcached• Cascading.SimpleDB
Data Processing
Trend Micro Confidential
• Pipe
Data Processing
Trend Micro Confidential
• Pipe• a base class for core processing model types
• Each• for each “tuple” in data do this to it
• GroupBy• similar to a ‘group by’ in SQL
• CoGroup• joins of tuple streams together
• Every• applies an Aggregator (like count, or sum) or Buffer (a sliding
window) Operation to every group of Tuples that pass through it.
• SubAssembly• allows for nesting reusable pipe assemblies into a Pipe class
Data Processing
Trend Micro Confidential
• CoGroup• InnerJoin• OuterJoin• LeftJoin• RightJoin• MixedJoin
Fields common = new Fields( "url" );Fields declared = new Fields( "url1", "word", "wd_count", "url2", "sentence", "snt_count" );Pipe join = new CoGroup( lhs, common, rhs, common, declared, new InnerJoin() );
Fields lhsFields = new Fields("url", "word", “count");Fields rhsFields = new Fields("url", “sentence", “count");Pipe join = new CoGroup( lhs, lhsFields, rhs, rhsFields, new InnerJoin() );
Data Processing
Trend Micro Confidential
• Operation• Define what to do on the data• Each operations allow logic on the row, such a
parsing dates, creating new attributes etc.• Every operations allow you to iterate over the
‘group’ of rows to do non-trivial operations.
Data Processing
Trend Micro Confidential
• Function• Identity Function• Debug Function• Sample and Limit Functions• Insert Function• Text Functions• Regular Expression Operations• Java Expression Operations
• "first-name" is a valid field name for use with Cascading, but this expression, first-name.trim(), will fail.
Data Processing
Trend Micro Confidential
• Filter• And• Or• Not• Xor• NotNull• Null• RegexFilter
Data Processing
Trend Micro Confidential
• Aggregator• Average• Count• First• Last• Max• Min• Sum
Data Processing
Trend Micro Confidential
• Buffer• It is very similar to the typical Reducer interface• It is very useful when header or footer values
need to be inserted into a grouping, or if values need to be inserted into the middle of the group values
Data Processing
Trend Micro Confidential
• Buffer
Data Processing
Trend Micro Confidential
Data Processing
Trend Micro Confidential
• Flow• To create a Flow, it must be planned though the
FlowConnector object. The connect() method is used to create new Flow instances based on a set of sink Taps, source Taps, and a pipe assembly.
Flow flow = new FlowConnector(new Properties()).connect( "flow-name", source, sink, pipe );
flow.complete();
Data Processing
Trend Micro Confidential
• MapReduceFlow• a Flow subclass that supports custom
MapReduce jobs pre-configured via the JobConf object.
• ProcessFlow• a Flow subclass that supports custom Riffle
jobs.
Data Processing
Trend Micro Confidential
• Cascades• Groups of Flow are called Cascades• Custom MapReduce jobs can participate in
Cascade
Cascade cascade = cascadeConnector.connect(flow1, flow2, flow3); cascade.complete();
Advanced Processing
Trend Micro Confidential
• Stream Assertions• Unit and Regression tests for Flows• Planner can remove ‘strict’, ‘validating’, or all
assertions
Advanced Processing
Trend Micro Confidential
• Failure Traps• Catch data causing Operations or Assertions to
fail• Allows processes to continue without data loss
Advanced Processing
Trend Micro Confidential
• Partial Aggregation instead of Combiners• trade Memory for IO gains by caching values
Fields groupingFields = new Fields( "date" );Fields valueField = new Fields( "size" );Fields sumField = new Fields( "total-size" );assembly = new SumBy( assembly, groupingFields, valueField,
sumField, long.class );
Monitoring
Trend Micro Confidential
• Implement FlowListener interface• onStarting• onStopping• onCompleted• onThrowable
Monitoring
Trend Micro Confidential
• Polling FlowStatsFlow ID: 756271765aa375773f9bbb5570de4d2aStepStats Count: 2
cascading.flow.FlowStepJob$1: 1, Step{status=RUNNING, startTime=1297344994624}Name: (1/2) ...SequenceFile[['host', 'offset', 'line']]"][akamaiPipe_whiteListPipe/52729/]Status: RUNNINGNum Mappers: 2Num Reducers: 1 Task ID: task_201102101702_0002_m_000003 Task ID: task_201102101702_0002_m_000000 Task ID: task_201102101702_0002_m_000001 Task ID: task_201102101702_0002_r_000000 Task ID: task_201102101702_0002_m_000002
cascading.flow.FlowStepJob$1: 2, Step{status=PENDING, startTime=0}Name: (2/2) Hfs["TextLine[['host', 'count']->[ALL]]"]["/user/alex/output"]"]Status: PENDINGNum Mappers: 0Num Reducers: 0
Testing
Trend Micro Confidential
• Use ClusterTestCase if you want to launch an embedded Hadoop cluster inside your TestCase
• A few validation and hadoop functions are provided
• Doesn’t support Hadoop 0.21 testing library
Cascading GUI
Trend Micro Confidential
• Yahoo PipesPipes is a powerful composition tool to aggregate,
manipulate, and mashup content from around the web.
Cascading GUI
Trend Micro Confidential
• WireItWireIt is an open-source javascript library to create
web wirable interfaces for dataflow applications, visual programming languages, graphical modeling, or graph editors.
Cascading GUI
Trend Micro Confidential
Trend Micro Confidential
Live Demo
Trend Micro Confidential
THANK YOU!