enterprise data workflows with cascading

68
Paco Nathan Concurrent, Inc. [email protected] @pacoid Scrub token Document Collection Tokenize Word Count GroupBy token Count Stop Word List Regex token HashJoin Left RHS M R Copyright @2012, Concurrent, Inc. Enterprise Data Workflows with Cascading 1 Monday, 17 December 12

Upload: paco-nathan

Post on 06-May-2015

2.208 views

Category:

Documents


1 download

DESCRIPTION

Cascading meetup held jointly with Enterprise Big Data meetup at Tata Consultancy Services in Santa Clara on 2012-12-17 http://www.meetup.com/cascading/events/94079162/

TRANSCRIPT

Page 1: Enterprise Data Workflows with Cascading

Paco NathanConcurrent, Inc.

[email protected]@pacoid

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Copyright @2012, Concurrent, Inc.

Enterprise Data Workflowswith Cascading

1Monday, 17 December 12

Page 2: Enterprise Data Workflows with Cascading

Unstructured Data meets Enterprise Scale

1. Cascading API: a few facts & quotes

2. Example #1: distributed file copy

3. Example #2: word count

4. Pattern Language: workflow abstraction

5. Compare: Scalding, Cascalog, Hive, Pig

2Monday, 17 December 12

Page 3: Enterprise Data Workflows with Cascading

Cascading API:a few facts & quotes

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

3Monday, 17 December 12

Page 4: Enterprise Data Workflows with Cascading

Enterprise apps, pre-Hadoop

SQLqueries

opsETL

priorities

insights

modeling

analysis

DataWarehouseanalyst

datasets

AnalyticsTools

ad-hoc queriesdashboards

datasources data

sources

domain

developer

Apps

4Monday, 17 December 12

Page 5: Enterprise Data Workflows with Cascading

the devil you know:

‣ “scale up” as needed – larger proprietary hardware

‣ data warehouse: e.g., Oracle, Teradata, etc. – expensive

‣ analytics: e.g., SAS, Microstrategy, etc. – expensive

‣ highly trained staff in specific roles – lots of “silos”

however, to be competitive now, the data rates must scale by orders of magnitude...

( alternatively, can we get hired onto the SAS sales team? )

Enterprise apps, pre-Hadoop

5Monday, 17 December 12

Page 6: Enterprise Data Workflows with Cascading

Apache Hadoop offers an attractive migration path:

‣ open source software – less expensive

‣ commodity hardware – less expensive

‣ fault tolerance for large-scale parallel workloads

‣ great use cases: Yahoo!, Facebook, Twitter, Amazon, Apple, etc.

‣ offload workflows from licensed platforms, based on “scale-out”

Enterprise apps, with Hadoop

6Monday, 17 December 12

Page 7: Enterprise Data Workflows with Cascading

Enterprise apps, with Hadoop

job trackername node

Hadoop Clusterdeveloper

Javaapps

analyst

queries,models

ops

ETLneeds

7Monday, 17 December 12

Page 8: Enterprise Data Workflows with Cascading

anything odd about that diagram?

‣ demands expert Hadoop developers

‣ experts are hard to find, expensive

‣ even harder to train from among existing staff

‣ early adopter abstractions are not suitable for Enterprise IT

‣ importantly: Hadoop is almost never used in isolation

Enterprise apps, with Hadoopjob trackername node

Hadoop Clusterdeveloper

Javaapps

analyst

queries,models

ops

ETLneeds

8Monday, 17 December 12

Page 9: Enterprise Data Workflows with Cascading

Cascading API: purpose‣ simplify data processing development and deployment

‣ improve application developer productivity

‣ enable data processing application manageability

9Monday, 17 December 12

Page 10: Enterprise Data Workflows with Cascading

Cascading API: a few facts

Java open source project (ASL 2) using Git, Gradle, Maven, JUnit, etc.

in production (~5 yrs) at hundreds of enterprise Hadoop deployments: Finance, Health Care, Transportation, other verticals

studies published about large use cases: Twitter, Etsy, eBay, Airbnb, Square, Climate Corp, FlightCaster, Williams-Sonoma, Trulia, TeleNav

partnerships and distribution with SpringSource, Amazon AWS, Microsoft Azure, Hortonworks, MapR, EMC

several open source projects built atop, managed by Twitter, Etsy, eBay, etc., which provide substantial Machine Learning libraries

DSLs available in Scala, Clojure, Python (Jython), Ruby (JRuby), Groovy

data “taps” integrate popular data frameworks via JDBC, Memcached, HBase, plus serialization in Apache Thrift, Avro, Kyro, etc.

entire app compiles into a single JAR: fully connected for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.

10Monday, 17 December 12

Page 11: Enterprise Data Workflows with Cascading

Cascading API: a few quotes“Cascading gives Java developers the ability to build Big Data applications on Hadoop using their existing skillset … Management can really go out and build a team around folks that are already very experienced with Java. Switching over to this is really a very short exercise.”

CIO, Thor Olavsrud, 2012-06-06cio.com/article/707782/Ease_Big_Data_Hiring_Pain_With_Cascading

“Masks the complexity of MapReduce, simplifies the programming, and speeds you on your journey toward actionable analytics … A vast improvement over native MapReduce functions or Pig UDFs.”

2012 BOSSIE Awards, James Borck, 2012-09-18infoworld.com/slideshow/65089

“Company’s promise to application developers is an opportunity to build and test applications on their desktops in the language of choice with familiar constructs and reusable components”

Dr. Dobb’s, Adrian Bridgwater, 2012-06-08drdobbs.com/jvm/where-does-big-data-go-to-get-data-inten/240001759

11Monday, 17 December 12

Page 12: Enterprise Data Workflows with Cascading

Enterprise concerns“Notes from the Mystery Machine Bus” by Steve Yegge, Google goo.gl/SeRZa

“conservative” “liberal”

(mostly) Enterprise (mostly) Start-Up

risk management customer experiments

assurance flexibility

well-defined schema schema follows code

explicit configuration convention

type-checking compiler interpreted scripts

wants no surprises wants no impediments

Java, Scala, Clojure, etc. PHP, Ruby, Python, etc.

Cascading, Scalding, Cascalog, etc. Hive, Pig, Hadoop Streaming, etc.

12Monday, 17 December 12

Page 13: Enterprise Data Workflows with Cascading

As Enterprise apps move into Hadoop and related BigData frameworks, risk profiles shift toward more conservative programming practices

Cascading provides a popular API – formally speaking, as a pattern language – for defining and managing Enterprise data workflows

Enterprise adoption

13Monday, 17 December 12

Page 14: Enterprise Data Workflows with Cascading

Migration of batch toolsets

Enterprise Migration Start-Ups

define pipelines J2EE Cascading Pig

query data SQL Lingual Hive

predictive models SAS Pattern Mahout

14Monday, 17 December 12

Page 15: Enterprise Data Workflows with Cascading

Cascading API benefits:

‣ addresses staffing bottlenecks due to Hadoop adoption

‣ reduces costs, while servicing risk concerns and “conservatism”

‣ manages complexity as the data continues to scale massively

‣ provides a pattern language for system integration

‣ leverages a workflow abstraction for Enterprise apps

‣ utilizes existing practices for JVM-based clusters

Summary

15Monday, 17 December 12

Page 16: Enterprise Data Workflows with Cascading

Code Example #1:distributed file copy

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

16Monday, 17 December 12

Page 17: Enterprise Data Workflows with Cascading

1: distributed file copy

Source

Sink

M

public class  Main  {  public static void  main( String[] args )    {    String inPath = args[ 0 ];    String outPath = args[ 1 ];

    Properties props = new Properties();    AppProps.setApplicationJarClass( props, Main.class );    HadoopFlowConnector flowConnector = new HadoopFlowConnector( props );

    // create the source tap    Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );

    // create the sink tap    Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );

    // specify a pipe to connect the taps    Pipe copyPipe = new Pipe( "copy" );

    // connect the taps, pipes, etc., into a flow    FlowDef flowDef = FlowDef.flowDef().setName( "copy" )     .addSource( copyPipe, inTap )     .addTailSink( copyPipe, outTap );

    // run the flow    flowConnector.connect( flowDef ).complete();    }  } 1 mapper

0 reducers10 lines code

17Monday, 17 December 12

Page 18: Enterprise Data Workflows with Cascading

1: distributed file copy

shown:

‣ a source tap – input data

‣ a sink tap – output data

‣ a pipe connecting a source to a sink

‣ simplest possible Cascading app

not shown:

‣ what kind of taps? and what size of input data set?

‣ could be: JDBC, HBase, Cassandra, XML, flat files, etc.

‣ what kind of topology? and what size of cluster?

‣ could be: Hadoop, in-memory, etc.

as system architects, we leverage pattern

18Monday, 17 December 12

Page 19: Enterprise Data Workflows with Cascading

principle: same JAR, any scale

Your Laptop:Mb’s dataHadoop standalone modepasses unit tests, or notruntime: seconds – minutes

Staging Cluster:Gb’s dataEMR + 4 Spot InstancesCI shows red or green lightsruntime: minutes – hours

Production Cluster:Tb’s dataEMR w/ 50 HPC InstancesOps monitors resultsruntime: hours – days

MegaCorp Enterprise IT:Pb’s data1000+ node private clusterEVP calls you when app failsruntime: days+

19Monday, 17 December 12

Page 20: Enterprise Data Workflows with Cascading

troubleshooting at scale:

‣ physical plan for a query provides a deterministic strategy

‣ avoid non-deterministic behavior – expensive when troubleshooting

‣ otherwise, edge cases become nightmares on large clusters

‣ again, addresses “conservative” need for predictability

‣ a core value which is unique to Cascading

principle: fail the same way twice

20Monday, 17 December 12

Page 21: Enterprise Data Workflows with Cascading

flow planner per topology:

‣ leverage the flow graph (DAG)

‣ catch as many errors as possible before an app gets submitted

‣ potential problems caught at compile time or at flow planner stage

‣…long before large, expensive resources start getting consumed

‣…or worse, before the wrong results get propagated downstream

principle: plan ahead

21Monday, 17 December 12

Page 22: Enterprise Data Workflows with Cascading

Code Example #2:word count

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

22Monday, 17 December 12

Page 23: Enterprise Data Workflows with Cascading

defined: count how often each word appears in a collection of text documents

a simple program provides a great test case for parallel processing, since it illustrates:

‣ requires a minimal amount of code

‣ demonstrates use of both symbolic and numeric values

‣ shows a dependency graph of tuples as an abstraction

‣ is not many steps away from useful search indexing

‣ serves as a “Hello World” for Hadoop apps

any distributed computing framework which runs Word Countefficiently in parallel at scale, can handle much larger, more interesting compute problems

2: word count

23Monday, 17 December 12

Page 24: Enterprise Data Workflows with Cascading

2: word count

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

1 mapper 1 reducer18 lines code gist.github.com/3900702

24Monday, 17 December 12

Page 25: Enterprise Data Workflows with Cascading

String docPath = args[ 0 ];String wcPath = args[ 1 ];Properties properties = new Properties();AppProps.setApplicationJarClass( properties, Main.class );HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

// create source and sink tapsTap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );

// specify a regex to split "document" text lines into token streamFields token = new Fields( "token" );Fields text = new Fields( "text" );RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );

// only returns "token"Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );

// determine the word countsPipe wcPipe = new Pipe( "wc", docPipe );wcPipe = new GroupBy( wcPipe, token );wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );

// connect the taps, pipes, etc., into a flowFlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );

// write a DOT file and run the flowFlow wcFlow = flowConnector.connect( flowDef );wcFlow.writeDOT( "dot/wc.dot" );wcFlow.complete();

2: word count DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

25Monday, 17 December 12

Page 26: Enterprise Data Workflows with Cascading

2: word count

1 mapper 1 reducer18 lines code

map

reduceEvery('wc')[Count[decl:'count']]

Hfs['TextDelimited[[UNKNOWN]->['token', 'count']]']['output/wc']']

GroupBy('wc')[by:['token']]

Each('token')[RegexSplitGenerator[decl:'token'][args:1]]

Hfs['TextDelimited[['doc_id', 'text']->[ALL]]']['data/rain.txt']']

[head]

[tail]

[{2}:'token', 'count'][{1}:'token']

[{2}:'doc_id', 'text'][{2}:'doc_id', 'text']

wc[{1}:'token'][{1}:'token']

[{2}:'token', 'count'][{2}:'token', 'count']

[{1}:'token'][{1}:'token']

26Monday, 17 December 12

Page 27: Enterprise Data Workflows with Cascading

deltas between Example #1 and Example #2:

‣ defines source tap as a collection of text documents

‣ defines sink tap to produce word count tuples (desired end result)

‣ uses named fields, applying structure to unstructured data

‣ adds semantics to the workflow, specifying business logic

‣ inserts operations into the pipe: Tokenize, GroupBy, Count

‣ shows function and aggregation applied to data tuples in parallel

2: word count

Source

Sink

M

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

27Monday, 17 December 12

Page 28: Enterprise Data Workflows with Cascading

Pattern Language:the workflow abstraction

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

28Monday, 17 December 12

Page 29: Enterprise Data Workflows with Cascading

enterprise data workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Tuples, Pipelines, Taps, Operations, Joins, Assertions, Traps, etc.

…in other words, “plumbing” as a pattern language for handling Big Data in Enterprise IT

29Monday, 17 December 12

Page 30: Enterprise Data Workflows with Cascading

pattern languagedefined: a structured method for solving large, complex design problems, where the syntax of the language promotes the use of best practices

“plumbing” metaphor of pipes and operators in Cascading helps indicate: algorithms to be used at particular points, appropriate architectural trade-offs, frameworks which must be integrated, etc.

design patterns: originated in consensus negotiation for architecture, later used in software engineering

wikipedia.org/wiki/Pattern_language

30Monday, 17 December 12

Page 31: Enterprise Data Workflows with Cascading

‣ Business Stakeholder POV:business process management for workflow orchestration (think BPM/BPEL)

‣ Systems Integrator POV:system integration of heterogenous data sources and compute platforms

‣ Data Scientist POV:a directed, acyclic graph (DAG) on which we can apply Amdahl's Law, etc.

‣ Data Architect POV:a physical plan for large-scale data flow management

‣ Software Architect POV:a pattern language, similar to plumbing or circuit design

‣ App Developer POV:API bindings for Java, Scala, Clojure, Jython, JRuby, etc.

‣ Systems Engineer POV:a JAR file, has passed CI, available in a Maven repo

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

data workflows: team

31Monday, 17 December 12

Page 32: Enterprise Data Workflows with Cascading

Java, Scala, Clojure, Jython, JRuby, Groovy, etc.…envision whatever runs in a JVM

data workflows: layers

Splunk, New Relic, Typesafe, Nagios, etc.

major changes in technology now

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

domain expertise, business trade-offs,operating parameters, market position, etc.

Apache Hadoop, in-memory local mode…envision GPUs, streaming, etc.

“asse

mb

ler”

cod

ebusinessprocess

APIlanguage

optimize / schedule

physicalplan

topology

machinedata

32Monday, 17 December 12

Page 33: Enterprise Data Workflows with Cascading

Hadoop cluster

sourcetap

sourcetap

sinktap

traptap

webAPI

Cascading app

customer profile

DBsCustomer

Profile DBs

web logsweb

logsweb logs

RecommenderSystem

Memcachedcluster

Customers

Supportreview

data workflows: example

33Monday, 17 December 12

Page 34: Enterprise Data Workflows with Cascading

data workflows: SQL vs. JVMabstraction SQL

parser SQL parser

optimizer logical plan, optimized based on stats

planner physical plan

machinedata

query history,table stats

topology b-trees, etc.

visualization ERD

schema table schema

catalog relational catalog

34Monday, 17 December 12

Page 35: Enterprise Data Workflows with Cascading

data workflows: SQL vs. JVMabstraction SQL JVM

parser SQL parser SQL-92 compliant parser(in progress)

optimizer logical plan, optimized based on stats

logical plan, optimized based on stats

planner physical plan API “plumbing”

machinedata

query history,table stats

app history, tuple stats

topology b-trees, etc. heterogenous, distributed: Hadoop, in-memory, etc.

visualization ERD flow diagram

schema table schema tuple schema

catalog relational catalog tap usage DB

35Monday, 17 December 12

Page 36: Enterprise Data Workflows with Cascading

Cascading taxonomy

topology

Cascadingapp

Mavenrepo

owner

sourcetap

appinstance

flow

step

slice

kind

scheduler

sinktap

traptap

hadoop | local

mapper | reducer

36Monday, 17 December 12

Page 37: Enterprise Data Workflows with Cascading

MapReduce architecture

Apache

Wikipedia

‣ name node / data node

‣ job tracker / task tracker

‣ submit queue

‣ task slots

‣ HDFS

‣ distributed cache

37Monday, 17 December 12

Page 38: Enterprise Data Workflows with Cascading

If you were leading a team responsible for Enterprise apps:

‣ which of the previous two slides seems easier to understand?

‣ which is simpler to use for training and managing a team?

‣ which costs the most in the long run?

Summary

38Monday, 17 December 12

Page 39: Enterprise Data Workflows with Cascading

Compare & Contrast:other approaches

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

39Monday, 17 December 12

Page 40: Enterprise Data Workflows with Cascading

wc: pseudocode

void map (String doc_id, String text): for each word w in segment(text): emit(w, "1");

void reduce (String word, Iterator partial_counts): int count = 0;

for each pc in partial_counts: count += Int(pc);

emit(word, String(count));

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

40Monday, 17 December 12

Page 41: Enterprise Data Workflows with Cascading

Scalding / Scala

// Sujit Pal// sujitpal.blogspot.com/2012/08/scalding-for-impatient.html

package com.mycompany.impatient

import com.twitter.scalding._

class Part2(args : Args) extends Job(args) {  val input = Tsv(args("input"), ('docId, 'text))  val output = Tsv(args("output"))  input.read.    flatMap('text -> 'word) { text : String => text.split("""\s+""") }.    groupBy('word) { group => group.size }.    write(output)}

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

41Monday, 17 December 12

Page 42: Enterprise Data Workflows with Cascading

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

github.com/twitter/scalding/wiki

notes:‣ code is compact, easy to understand

‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.

‣ very large-scale, complex problems can be handled in just a few lines of code

‣ many large-scale apps in production deployments

‣ significant investments by Twitter, Etsy, eBay, etc., in this open source project

‣ extensive libraries are available for linear algebra, machine learning – e.g., “Matrix API”

Scalding / Scala

42Monday, 17 December 12

Page 43: Enterprise Data Workflows with Cascading

Cascalog / Clojure

; Paul Lam; github.com/Quantisan/Impatient

(ns impatient.core  (:use [cascalog.api]        [cascalog.more-taps :only (hfs-delimited)])  (:require [clojure.string :as s]            [cascalog.ops :as c])  (:gen-class))

(defmapcatop split [line]  "reads in a line of string and splits it by regex"  (s/split line #"[\[\]\\\(\),.)\s]+"))

(defn -main [in out & args]  (?<- (hfs-delimited out)       [?word ?count]       ((hfs-delimited in :skip-header? true) _ ?line)       (split ?line :> ?word)       (c/count ?count)))

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

43Monday, 17 December 12

Page 44: Enterprise Data Workflows with Cascading

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Cascalog / Clojuregithub.com/nathanmarz/cascalog/wiki

notes:‣ code is compact, easy to understand

‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.

‣ significant investments by Twitter, Climate Corp, etc., in this open source project

‣ can run queries from the Clojure REPL

‣ compelling for very large-scale use cases where code correctness can be verified before deployment

44Monday, 17 December 12

Page 45: Enterprise Data Workflows with Cascading

Apache Hive

-- Steve Severance-- stackoverflow.com/questions/10039949/word-count-program-in-hive

CREATE TABLE input (line STRING);

LOAD DATA LOCAL INPATH 'input.tsv' OVERWRITE INTO TABLE input;

SELECT word, COUNT(*)FROM input  LATERAL VIEW explode(split(text, ' ')) lTable AS word GROUP BY word;

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

45Monday, 17 December 12

Page 46: Enterprise Data Workflows with Cascading

hive.apache.org

pro:‣ most popular abstraction atop Apache Hadoop

‣ SQL-like language is syntactically familiar to most analysts

‣ simple to load large-scale unstructured data and run ad-hoc queries

con:‣ not a relational engine, many surprises at scale

‣ difficult to represent complex workflows, ML algorithms, etc.

‣ one poorly-trained analyst can bottleneck an entire cluster

‣ app-level integration requires other coding, outside of script language

‣ logical planner mixed with physical planner; cannot collect app stats

‣ non-deterministic exec: number of mappers+reducers changes unexpectedly

‣ business logic must cross multiple language boundaries: difficult to troubleshoot, optimize, audit, handle exceptions, set notifications, etc.

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Apache Hive

46Monday, 17 December 12

Page 47: Enterprise Data Workflows with Cascading

Apache Pig

-- kudos to Dmitriy Ryaboy

docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);docPipe = FILTER docPipe BY doc_id != 'doc_id';

-- specify regex to split "document" text lines into token streamtokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(text, ' [](),.')) AS token;tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';

-- determine the word countstokenGroups = GROUP tokenPipe BY token;wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;

-- outputSTORE wcPipe INTO '$wcPath' USING PigStorage('\t', 'tagsource');EXPLAIN -out dot/wc_pig.dot -dot wcPipe;

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

47Monday, 17 December 12

Page 48: Enterprise Data Workflows with Cascading

pig.apache.org

pro:‣ easy to learn data manipulation language (DML)

‣ interactive prompt (Grunt) makes it simple to prototype apps

‣ extensibility through UDFs

con:‣ not a full programming language; must extend via UDFs outside of language

‣ app-level integration requires other coding, outside of script language

‣ simple problems are simple to do; hard problems become quite complex

‣ difficult to parameterize scripts externally; must rewrite to change taps!

‣ logical planner mixed with physical planner; cannot collect app stats

‣ non-deterministic exec: number of mappers+reducers changes unexpectedly

‣ business logic must cross multiple language boundaries: difficult to troubleshoot, optimize, audit, handle exceptions, set notifications, etc.

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Apache Pig

48Monday, 17 December 12

Page 49: Enterprise Data Workflows with Cascading

Code Example #N:city of palo alto, etc.

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

49Monday, 17 December 12

Page 50: Enterprise Data Workflows with Cascading

extend: wc + scrub + stop words

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

1 mapper 1 reducer28+10 lines code

50Monday, 17 December 12

Page 51: Enterprise Data Workflows with Cascading

Scrubtoken

DocumentCollection

Tokenize

Sortcount

WordCount

CountBytoken

Stop WordList

Regextoken

token

TF

CountBydoc_id, token

D Uniquedoc_id

Insert1

SumBydoc_id

HashJoinLeft

RHS

HashJoin

RHS

DF Unique

tokenCountBy

token CoGroup

RHS

ExprFunctf-idf

TF-IDF

M

R

R

R

R

RR

RM

M

M M

M

M

RM

M

M

extend: a simple search engine

10 mappers 8 reducers68+14 lines code

51Monday, 17 December 12

Page 52: Enterprise Data Workflows with Cascading

City of Palo Alto open data

github.com/Cascading/CoPA/wiki‣ GIS export for parks, roads, trees (unstructured / open data)‣ log files of personalized/frequented locations in Palo Alto via iPhone GPS tracks‣ curated metadata, used to enrich the dataset‣ could extend via mash-up with many available public data APIs

Enterprise-scale app: road albedo + tree species metadata + geospatial indexing

“Find a shady spot on a summer day to walk near downtown and take a call…”

M

M

M

R

M

M

M

M

GroupBytree_name

RM

Checkpointtsv

Regexfilter

Regexparser

road

RoadMetadata

HashJoinLeft

RHS

EstimateAlbedo

RoadSegments Geohash

CoGroup

RHStree

road

Filtertree_dist

TreeDistance

Checkpointshade

GPSlogs

Geohash

CoGroup

RHS

reco

CoPAGIS exprot

Regexparser

tsv

park

Regexfilter

park

Scrubspecies

Geohash

Regexfilter

Regexparser

tree

TreeMetadata

HashJoinLeft

RHS

FailureTraps

M

R

52Monday, 17 December 12

Page 53: Enterprise Data Workflows with Cascading

CoPA: log events

53Monday, 17 December 12

Page 54: Enterprise Data Workflows with Cascading

‣ addr: 115 HAWTHORNE AVE‣ lat/lng: 37.446, -122.168‣ geohash: 9q9jh0‣ tree: 413 site 2‣ species: Liquidambar styraciflua‣ avg height 23 m‣ road albedo: 0.12‣ distance: 10 m‣ a short walk from my train stop ✔

0.00

0.02

0.04

0.06

0.08

0.10

0.12

0 10 20 30 40 50avg_height

dens

ity

count0100200300

Estimated Tree Height (meters)CoPA: results

54Monday, 17 December 12

Page 55: Enterprise Data Workflows with Cascading

PMML:predictive modeling

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Intro to Cascading

55Monday, 17 December 12

Page 56: Enterprise Data Workflows with Cascading

PMML model

56Monday, 17 December 12

Page 57: Enterprise Data Workflows with Cascading

cascading.patternexample:

1. use customer order history as the training data set

2. train a risk classifier for orders, using Random Forest

3. export model from R to PMML

4. build a Cascading app to execute the PMML model

4.1. generate a pipeline from PMML description

4.2. planner builds the flow for a topology (Hadoop)

4.3. compile app to a JAR file

5. deploy the app at scale to calculate scores

57Monday, 17 December 12

Page 58: Enterprise Data Workflows with Cascading

Cascading apps

risk classifierdimension: per-order

risk classifierdimension: customer 360

PMML model

analyst'slaptopdata prep

detectfraudsters

predictmodel costs

customertransactions

score new orders

trainingdata sets

batchworkloads

real-timeworkloads

anomalydetection

segmentcustomers

IMDGHadoop

partner dataDW

ETL

chargebacks,etc.

CustomerDB

velocitymetrics

cascading.pattern

58Monday, 17 December 12

Page 59: Enterprise Data Workflows with Cascading

1:“orders” data set...train/test in R...exported as PMML

59Monday, 17 December 12

Page 60: Enterprise Data Workflows with Cascading

## train a RandomForest model

f <- as.formula("as.factor(label) ~ .")fit <- randomForest(f, data_train, ntree=50)

## test the model on the holdout test set

print(fit$importance)print(fit)

predicted <- predict(fit, data)data$predicted <- predictedconfuse <- table(pred = predicted, true = data[,1])print(confuse)

## export predicted labels to TSV

write.table(data, file=paste(dat_folder, "sample.tsv", sep="/"), quote=FALSE, sep="\t", row.names=FALSE)

## export RF model to PMML

saveXML(pmml(fit), file=paste(dat_folder, "sample.rf.xml", sep="/"))

R modeling

60Monday, 17 December 12

Page 61: Enterprise Data Workflows with Cascading

MeanDecreaseGinivar0 0.6591701var1 33.8625179var2 8.0290020

OOB estimate of error rate: 13.83%Confusion matrix: 0 1 class.error0 28 5 0.15151521 8 53 0.1311475

[1] "./data/sample.rf.xml"

R output

61Monday, 17 December 12

Page 62: Enterprise Data Workflows with Cascading

2:Cascading app takes PMML as a parameter...

62Monday, 17 December 12

Page 63: Enterprise Data Workflows with Cascading

PMML model<?xml version="1.0"?><PMML version="4.0" xmlns="http://www.dmg.org/PMML-4_0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.dmg.org/PMML-4_0 http://www.dmg.org/v4-0/pmml-4-0.xsd"> <Header copyright="Copyright (c)2012 Concurrent, Inc." description="Random Forest Tree Model">  <Extension name="user" value="ceteri" extender="Rattle/PMML"/>  <Application name="Rattle/PMML" version="1.2.30"/>  <Timestamp>2012-10-22 19:39:28</Timestamp> </Header> <DataDictionary numberOfFields="4">  <DataField name="label" optype="categorical" dataType="string">   <Value value="0"/>   <Value value="1"/>  </DataField>  <DataField name="var0" optype="continuous" dataType="double"/>  <DataField name="var1" optype="continuous" dataType="double"/>  <DataField name="var2" optype="continuous" dataType="double"/> </DataDictionary> <MiningModel modelName="randomForest_Model" functionName="classification">  <MiningSchema>   <MiningField name="label" usageType="predicted"/>   <MiningField name="var0" usageType="active"/>   <MiningField name="var1" usageType="active"/>   <MiningField name="var2" usageType="active"/>  </MiningSchema>  <Segmentation multipleModelMethod="majorityVote">   <Segment id="1">    <True/>    <TreeModel modelName="randomForest_Model" functionName="classification" algorithmName="randomForest" splitCharacteristic="binarySplit">     <MiningSchema>      <MiningField name="label" usageType="predicted"/>      <MiningField name="var0" usageType="active"/>      <MiningField name="var1" usageType="active"/>      <MiningField name="var2" usageType="active"/>     </MiningSchema>...

63Monday, 17 December 12

Page 64: Enterprise Data Workflows with Cascading

public class Main { public static void main( String[] args ) {   String pmmlPath = args[ 0 ];   String ordersPath = args[ 1 ];   String classifyPath = args[ 2 ];   String trapPath = args[ 3 ];

  Properties properties = new Properties();   AppProps.setApplicationJarClass( properties, Main.class );   HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

  // create source and sink taps   Tap ordersTap = new Hfs( new TextDelimited( true, "\t" ), ordersPath );   Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath );   Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );

  // define a "Classifier" model from PMML to evaluate the orders   Classifier classifier = new Classifier( pmmlPath );   Pipe classifyPipe = new Each( new Pipe( "classify" ), classifier.getFields(), new ClassifierFunction( new Fields( "score" ), classifier ), Fields.ALL );

  // connect the taps, pipes, etc., into a flow   FlowDef flowDef = FlowDef.flowDef().setName( "classify" )    .addSource( classifyPipe, ordersTap )    .addTrap( classifyPipe, trapTap )    .addSink( classifyPipe, classifyTap );

  // write a DOT file and run the flow   Flow classifyFlow = flowConnector.connect( flowDef );   classifyFlow.writeDOT( "dot/classify.dot" );   classifyFlow.complete(); }}

Cascading app

64Monday, 17 December 12

Page 65: Enterprise Data Workflows with Cascading

3:app deployed on a cluster to score customers at scale...

65Monday, 17 December 12

Page 66: Enterprise Data Workflows with Cascading

elastic-mapreduce --create --name "RF" \ --jar s3n://temp.cascading.org/pattern/pattern.jar \ --arg s3n://temp.cascading.org/pattern/sample.rf.xml \ --arg s3n://temp.cascading.org/pattern/sample.tsv \ --arg s3n://temp.cascading.org/pattern/out/classify \ --arg s3n://temp.cascading.org/pattern/out/trap

deploy to cloud

aws.amazon.com/elasticmapreduce/

66Monday, 17 December 12

Page 67: Enterprise Data Workflows with Cascading

resultsbash-3.2$ head output/classify/part-00000 label" var0" var1" var2" order_id" predicted"score1" 0" 1" 0" 6f8e1014" 1" 10" 0" 0" 1" 6f8ea22e" 0" 01" 0" 1" 0" 6f8ea435" 1" 10" 0" 0" 1" 6f8ea5e1" 0" 01" 0" 1" 0" 6f8ea785" 1" 11" 0" 1" 0" 6f8ea91e" 1" 10" 1" 0" 0" 6f8eaaba" 0" 01" 0" 1" 0" 6f8eac54" 1" 10" 1" 1" 0" 6f8eade3" 1" 1

67Monday, 17 December 12

Page 68: Enterprise Data Workflows with Cascading

blog, code/wiki/gists, JARs, community, DevOps products:

cascading.org

github.org/Cascading

conjars.org

meetup.com/cascading

goo.gl/KQtUL

concurrentinc.com

drill-down

[email protected]@pacoid

Copyright @2012, Concurrent, Inc.

68Monday, 17 December 12