enterprise data workflows with cascading
DESCRIPTION
Cascading meetup held jointly with Enterprise Big Data meetup at Tata Consultancy Services in Santa Clara on 2012-12-17 http://www.meetup.com/cascading/events/94079162/TRANSCRIPT
Paco NathanConcurrent, Inc.
[email protected]@pacoid
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Copyright @2012, Concurrent, Inc.
Enterprise Data Workflowswith Cascading
1Monday, 17 December 12
Unstructured Data meets Enterprise Scale
1. Cascading API: a few facts & quotes
2. Example #1: distributed file copy
3. Example #2: word count
4. Pattern Language: workflow abstraction
5. Compare: Scalding, Cascalog, Hive, Pig
2Monday, 17 December 12
Cascading API:a few facts & quotes
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
3Monday, 17 December 12
Enterprise apps, pre-Hadoop
SQLqueries
opsETL
priorities
insights
modeling
analysis
DataWarehouseanalyst
datasets
AnalyticsTools
ad-hoc queriesdashboards
datasources data
sources
domain
developer
Apps
4Monday, 17 December 12
the devil you know:
‣ “scale up” as needed – larger proprietary hardware
‣ data warehouse: e.g., Oracle, Teradata, etc. – expensive
‣ analytics: e.g., SAS, Microstrategy, etc. – expensive
‣ highly trained staff in specific roles – lots of “silos”
however, to be competitive now, the data rates must scale by orders of magnitude...
( alternatively, can we get hired onto the SAS sales team? )
Enterprise apps, pre-Hadoop
5Monday, 17 December 12
Apache Hadoop offers an attractive migration path:
‣ open source software – less expensive
‣ commodity hardware – less expensive
‣ fault tolerance for large-scale parallel workloads
‣ great use cases: Yahoo!, Facebook, Twitter, Amazon, Apple, etc.
‣ offload workflows from licensed platforms, based on “scale-out”
Enterprise apps, with Hadoop
6Monday, 17 December 12
Enterprise apps, with Hadoop
job trackername node
Hadoop Clusterdeveloper
Javaapps
analyst
queries,models
ops
ETLneeds
7Monday, 17 December 12
anything odd about that diagram?
‣ demands expert Hadoop developers
‣ experts are hard to find, expensive
‣ even harder to train from among existing staff
‣ early adopter abstractions are not suitable for Enterprise IT
‣ importantly: Hadoop is almost never used in isolation
Enterprise apps, with Hadoopjob trackername node
Hadoop Clusterdeveloper
Javaapps
analyst
queries,models
ops
ETLneeds
8Monday, 17 December 12
Cascading API: purpose‣ simplify data processing development and deployment
‣ improve application developer productivity
‣ enable data processing application manageability
9Monday, 17 December 12
Cascading API: a few facts
Java open source project (ASL 2) using Git, Gradle, Maven, JUnit, etc.
in production (~5 yrs) at hundreds of enterprise Hadoop deployments: Finance, Health Care, Transportation, other verticals
studies published about large use cases: Twitter, Etsy, eBay, Airbnb, Square, Climate Corp, FlightCaster, Williams-Sonoma, Trulia, TeleNav
partnerships and distribution with SpringSource, Amazon AWS, Microsoft Azure, Hortonworks, MapR, EMC
several open source projects built atop, managed by Twitter, Etsy, eBay, etc., which provide substantial Machine Learning libraries
DSLs available in Scala, Clojure, Python (Jython), Ruby (JRuby), Groovy
data “taps” integrate popular data frameworks via JDBC, Memcached, HBase, plus serialization in Apache Thrift, Avro, Kyro, etc.
entire app compiles into a single JAR: fully connected for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.
10Monday, 17 December 12
Cascading API: a few quotes“Cascading gives Java developers the ability to build Big Data applications on Hadoop using their existing skillset … Management can really go out and build a team around folks that are already very experienced with Java. Switching over to this is really a very short exercise.”
CIO, Thor Olavsrud, 2012-06-06cio.com/article/707782/Ease_Big_Data_Hiring_Pain_With_Cascading
“Masks the complexity of MapReduce, simplifies the programming, and speeds you on your journey toward actionable analytics … A vast improvement over native MapReduce functions or Pig UDFs.”
2012 BOSSIE Awards, James Borck, 2012-09-18infoworld.com/slideshow/65089
“Company’s promise to application developers is an opportunity to build and test applications on their desktops in the language of choice with familiar constructs and reusable components”
Dr. Dobb’s, Adrian Bridgwater, 2012-06-08drdobbs.com/jvm/where-does-big-data-go-to-get-data-inten/240001759
11Monday, 17 December 12
Enterprise concerns“Notes from the Mystery Machine Bus” by Steve Yegge, Google goo.gl/SeRZa
“conservative” “liberal”
(mostly) Enterprise (mostly) Start-Up
risk management customer experiments
assurance flexibility
well-defined schema schema follows code
explicit configuration convention
type-checking compiler interpreted scripts
wants no surprises wants no impediments
Java, Scala, Clojure, etc. PHP, Ruby, Python, etc.
Cascading, Scalding, Cascalog, etc. Hive, Pig, Hadoop Streaming, etc.
12Monday, 17 December 12
As Enterprise apps move into Hadoop and related BigData frameworks, risk profiles shift toward more conservative programming practices
Cascading provides a popular API – formally speaking, as a pattern language – for defining and managing Enterprise data workflows
Enterprise adoption
13Monday, 17 December 12
Migration of batch toolsets
Enterprise Migration Start-Ups
define pipelines J2EE Cascading Pig
query data SQL Lingual Hive
predictive models SAS Pattern Mahout
14Monday, 17 December 12
Cascading API benefits:
‣ addresses staffing bottlenecks due to Hadoop adoption
‣ reduces costs, while servicing risk concerns and “conservatism”
‣ manages complexity as the data continues to scale massively
‣ provides a pattern language for system integration
‣ leverages a workflow abstraction for Enterprise apps
‣ utilizes existing practices for JVM-based clusters
Summary
15Monday, 17 December 12
Code Example #1:distributed file copy
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
16Monday, 17 December 12
1: distributed file copy
Source
Sink
M
public class Main { public static void main( String[] args ) { String inPath = args[ 0 ]; String outPath = args[ 1 ];
Properties props = new Properties(); AppProps.setApplicationJarClass( props, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( props );
// create the source tap Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );
// create the sink tap Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );
// specify a pipe to connect the taps Pipe copyPipe = new Pipe( "copy" );
// connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName( "copy" ) .addSource( copyPipe, inTap ) .addTailSink( copyPipe, outTap );
// run the flow flowConnector.connect( flowDef ).complete(); } } 1 mapper
0 reducers10 lines code
17Monday, 17 December 12
1: distributed file copy
shown:
‣ a source tap – input data
‣ a sink tap – output data
‣ a pipe connecting a source to a sink
‣ simplest possible Cascading app
not shown:
‣ what kind of taps? and what size of input data set?
‣ could be: JDBC, HBase, Cassandra, XML, flat files, etc.
‣ what kind of topology? and what size of cluster?
‣ could be: Hadoop, in-memory, etc.
as system architects, we leverage pattern
18Monday, 17 December 12
principle: same JAR, any scale
Your Laptop:Mb’s dataHadoop standalone modepasses unit tests, or notruntime: seconds – minutes
Staging Cluster:Gb’s dataEMR + 4 Spot InstancesCI shows red or green lightsruntime: minutes – hours
Production Cluster:Tb’s dataEMR w/ 50 HPC InstancesOps monitors resultsruntime: hours – days
MegaCorp Enterprise IT:Pb’s data1000+ node private clusterEVP calls you when app failsruntime: days+
19Monday, 17 December 12
troubleshooting at scale:
‣ physical plan for a query provides a deterministic strategy
‣ avoid non-deterministic behavior – expensive when troubleshooting
‣ otherwise, edge cases become nightmares on large clusters
‣ again, addresses “conservative” need for predictability
‣ a core value which is unique to Cascading
principle: fail the same way twice
20Monday, 17 December 12
flow planner per topology:
‣ leverage the flow graph (DAG)
‣ catch as many errors as possible before an app gets submitted
‣ potential problems caught at compile time or at flow planner stage
‣…long before large, expensive resources start getting consumed
‣…or worse, before the wrong results get propagated downstream
principle: plan ahead
21Monday, 17 December 12
Code Example #2:word count
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
22Monday, 17 December 12
defined: count how often each word appears in a collection of text documents
a simple program provides a great test case for parallel processing, since it illustrates:
‣ requires a minimal amount of code
‣ demonstrates use of both symbolic and numeric values
‣ shows a dependency graph of tuples as an abstraction
‣ is not many steps away from useful search indexing
‣ serves as a “Hello World” for Hadoop apps
any distributed computing framework which runs Word Countefficiently in parallel at scale, can handle much larger, more interesting compute problems
2: word count
23Monday, 17 December 12
2: word count
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
1 mapper 1 reducer18 lines code gist.github.com/3900702
24Monday, 17 December 12
String docPath = args[ 0 ];String wcPath = args[ 1 ];Properties properties = new Properties();AppProps.setApplicationJarClass( properties, Main.class );HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink tapsTap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
// specify a regex to split "document" text lines into token streamFields token = new Fields( "token" );Fields text = new Fields( "text" );RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
// only returns "token"Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );
// determine the word countsPipe wcPipe = new Pipe( "wc", docPipe );wcPipe = new GroupBy( wcPipe, token );wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flowFlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );
// write a DOT file and run the flowFlow wcFlow = flowConnector.connect( flowDef );wcFlow.writeDOT( "dot/wc.dot" );wcFlow.complete();
2: word count DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
25Monday, 17 December 12
2: word count
1 mapper 1 reducer18 lines code
map
reduceEvery('wc')[Count[decl:'count']]
Hfs['TextDelimited[[UNKNOWN]->['token', 'count']]']['output/wc']']
GroupBy('wc')[by:['token']]
Each('token')[RegexSplitGenerator[decl:'token'][args:1]]
Hfs['TextDelimited[['doc_id', 'text']->[ALL]]']['data/rain.txt']']
[head]
[tail]
[{2}:'token', 'count'][{1}:'token']
[{2}:'doc_id', 'text'][{2}:'doc_id', 'text']
wc[{1}:'token'][{1}:'token']
[{2}:'token', 'count'][{2}:'token', 'count']
[{1}:'token'][{1}:'token']
26Monday, 17 December 12
deltas between Example #1 and Example #2:
‣ defines source tap as a collection of text documents
‣ defines sink tap to produce word count tuples (desired end result)
‣ uses named fields, applying structure to unstructured data
‣ adds semantics to the workflow, specifying business logic
‣ inserts operations into the pipe: Tokenize, GroupBy, Count
‣ shows function and aggregation applied to data tuples in parallel
2: word count
Source
Sink
M
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
27Monday, 17 December 12
Pattern Language:the workflow abstraction
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
28Monday, 17 December 12
enterprise data workflows
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Tuples, Pipelines, Taps, Operations, Joins, Assertions, Traps, etc.
…in other words, “plumbing” as a pattern language for handling Big Data in Enterprise IT
29Monday, 17 December 12
pattern languagedefined: a structured method for solving large, complex design problems, where the syntax of the language promotes the use of best practices
“plumbing” metaphor of pipes and operators in Cascading helps indicate: algorithms to be used at particular points, appropriate architectural trade-offs, frameworks which must be integrated, etc.
design patterns: originated in consensus negotiation for architecture, later used in software engineering
wikipedia.org/wiki/Pattern_language
30Monday, 17 December 12
‣ Business Stakeholder POV:business process management for workflow orchestration (think BPM/BPEL)
‣ Systems Integrator POV:system integration of heterogenous data sources and compute platforms
‣ Data Scientist POV:a directed, acyclic graph (DAG) on which we can apply Amdahl's Law, etc.
‣ Data Architect POV:a physical plan for large-scale data flow management
‣ Software Architect POV:a pattern language, similar to plumbing or circuit design
‣ App Developer POV:API bindings for Java, Scala, Clojure, Jython, JRuby, etc.
‣ Systems Engineer POV:a JAR file, has passed CI, available in a Maven repo
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
data workflows: team
31Monday, 17 December 12
Java, Scala, Clojure, Jython, JRuby, Groovy, etc.…envision whatever runs in a JVM
data workflows: layers
Splunk, New Relic, Typesafe, Nagios, etc.
major changes in technology now
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
domain expertise, business trade-offs,operating parameters, market position, etc.
Apache Hadoop, in-memory local mode…envision GPUs, streaming, etc.
“asse
mb
ler”
cod
ebusinessprocess
APIlanguage
optimize / schedule
physicalplan
topology
machinedata
32Monday, 17 December 12
Hadoop cluster
sourcetap
sourcetap
sinktap
traptap
webAPI
Cascading app
customer profile
DBsCustomer
Profile DBs
web logsweb
logsweb logs
RecommenderSystem
Memcachedcluster
Customers
Supportreview
data workflows: example
33Monday, 17 December 12
data workflows: SQL vs. JVMabstraction SQL
parser SQL parser
optimizer logical plan, optimized based on stats
planner physical plan
machinedata
query history,table stats
topology b-trees, etc.
visualization ERD
schema table schema
catalog relational catalog
34Monday, 17 December 12
data workflows: SQL vs. JVMabstraction SQL JVM
parser SQL parser SQL-92 compliant parser(in progress)
optimizer logical plan, optimized based on stats
logical plan, optimized based on stats
planner physical plan API “plumbing”
machinedata
query history,table stats
app history, tuple stats
topology b-trees, etc. heterogenous, distributed: Hadoop, in-memory, etc.
visualization ERD flow diagram
schema table schema tuple schema
catalog relational catalog tap usage DB
35Monday, 17 December 12
Cascading taxonomy
topology
Cascadingapp
Mavenrepo
owner
sourcetap
appinstance
flow
step
slice
kind
scheduler
sinktap
traptap
hadoop | local
mapper | reducer
36Monday, 17 December 12
MapReduce architecture
Apache
Wikipedia
‣ name node / data node
‣ job tracker / task tracker
‣ submit queue
‣ task slots
‣ HDFS
‣ distributed cache
37Monday, 17 December 12
If you were leading a team responsible for Enterprise apps:
‣ which of the previous two slides seems easier to understand?
‣ which is simpler to use for training and managing a team?
‣ which costs the most in the long run?
Summary
38Monday, 17 December 12
Compare & Contrast:other approaches
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
39Monday, 17 December 12
wc: pseudocode
void map (String doc_id, String text): for each word w in segment(text): emit(w, "1");
void reduce (String word, Iterator partial_counts): int count = 0;
for each pc in partial_counts: count += Int(pc);
emit(word, String(count));
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
40Monday, 17 December 12
Scalding / Scala
// Sujit Pal// sujitpal.blogspot.com/2012/08/scalding-for-impatient.html
package com.mycompany.impatient
import com.twitter.scalding._
class Part2(args : Args) extends Job(args) { val input = Tsv(args("input"), ('docId, 'text)) val output = Tsv(args("output")) input.read. flatMap('text -> 'word) { text : String => text.split("""\s+""") }. groupBy('word) { group => group.size }. write(output)}
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
41Monday, 17 December 12
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
github.com/twitter/scalding/wiki
notes:‣ code is compact, easy to understand
‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.
‣ very large-scale, complex problems can be handled in just a few lines of code
‣ many large-scale apps in production deployments
‣ significant investments by Twitter, Etsy, eBay, etc., in this open source project
‣ extensive libraries are available for linear algebra, machine learning – e.g., “Matrix API”
Scalding / Scala
42Monday, 17 December 12
Cascalog / Clojure
; Paul Lam; github.com/Quantisan/Impatient
(ns impatient.core (:use [cascalog.api] [cascalog.more-taps :only (hfs-delimited)]) (:require [clojure.string :as s] [cascalog.ops :as c]) (:gen-class))
(defmapcatop split [line] "reads in a line of string and splits it by regex" (s/split line #"[\[\]\\\(\),.)\s]+"))
(defn -main [in out & args] (?<- (hfs-delimited out) [?word ?count] ((hfs-delimited in :skip-header? true) _ ?line) (split ?line :> ?word) (c/count ?count)))
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
43Monday, 17 December 12
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
Cascalog / Clojuregithub.com/nathanmarz/cascalog/wiki
notes:‣ code is compact, easy to understand
‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.
‣ significant investments by Twitter, Climate Corp, etc., in this open source project
‣ can run queries from the Clojure REPL
‣ compelling for very large-scale use cases where code correctness can be verified before deployment
44Monday, 17 December 12
Apache Hive
-- Steve Severance-- stackoverflow.com/questions/10039949/word-count-program-in-hive
CREATE TABLE input (line STRING);
LOAD DATA LOCAL INPATH 'input.tsv' OVERWRITE INTO TABLE input;
SELECT word, COUNT(*)FROM input LATERAL VIEW explode(split(text, ' ')) lTable AS word GROUP BY word;
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
45Monday, 17 December 12
hive.apache.org
pro:‣ most popular abstraction atop Apache Hadoop
‣ SQL-like language is syntactically familiar to most analysts
‣ simple to load large-scale unstructured data and run ad-hoc queries
con:‣ not a relational engine, many surprises at scale
‣ difficult to represent complex workflows, ML algorithms, etc.
‣ one poorly-trained analyst can bottleneck an entire cluster
‣ app-level integration requires other coding, outside of script language
‣ logical planner mixed with physical planner; cannot collect app stats
‣ non-deterministic exec: number of mappers+reducers changes unexpectedly
‣ business logic must cross multiple language boundaries: difficult to troubleshoot, optimize, audit, handle exceptions, set notifications, etc.
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
Apache Hive
46Monday, 17 December 12
Apache Pig
-- kudos to Dmitriy Ryaboy
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);docPipe = FILTER docPipe BY doc_id != 'doc_id';
-- specify regex to split "document" text lines into token streamtokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(text, ' [](),.')) AS token;tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- determine the word countstokenGroups = GROUP tokenPipe BY token;wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- outputSTORE wcPipe INTO '$wcPath' USING PigStorage('\t', 'tagsource');EXPLAIN -out dot/wc_pig.dot -dot wcPipe;
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
47Monday, 17 December 12
pig.apache.org
pro:‣ easy to learn data manipulation language (DML)
‣ interactive prompt (Grunt) makes it simple to prototype apps
‣ extensibility through UDFs
con:‣ not a full programming language; must extend via UDFs outside of language
‣ app-level integration requires other coding, outside of script language
‣ simple problems are simple to do; hard problems become quite complex
‣ difficult to parameterize scripts externally; must rewrite to change taps!
‣ logical planner mixed with physical planner; cannot collect app stats
‣ non-deterministic exec: number of mappers+reducers changes unexpectedly
‣ business logic must cross multiple language boundaries: difficult to troubleshoot, optimize, audit, handle exceptions, set notifications, etc.
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
Apache Pig
48Monday, 17 December 12
Code Example #N:city of palo alto, etc.
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
49Monday, 17 December 12
extend: wc + scrub + stop words
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
1 mapper 1 reducer28+10 lines code
50Monday, 17 December 12
Scrubtoken
DocumentCollection
Tokenize
Sortcount
WordCount
CountBytoken
Stop WordList
Regextoken
token
TF
CountBydoc_id, token
D Uniquedoc_id
Insert1
SumBydoc_id
HashJoinLeft
RHS
HashJoin
RHS
DF Unique
tokenCountBy
token CoGroup
RHS
ExprFunctf-idf
TF-IDF
M
R
R
R
R
RR
RM
M
M M
M
M
RM
M
M
extend: a simple search engine
10 mappers 8 reducers68+14 lines code
51Monday, 17 December 12
City of Palo Alto open data
github.com/Cascading/CoPA/wiki‣ GIS export for parks, roads, trees (unstructured / open data)‣ log files of personalized/frequented locations in Palo Alto via iPhone GPS tracks‣ curated metadata, used to enrich the dataset‣ could extend via mash-up with many available public data APIs
Enterprise-scale app: road albedo + tree species metadata + geospatial indexing
“Find a shady spot on a summer day to walk near downtown and take a call…”
M
M
M
R
M
M
M
M
GroupBytree_name
RM
Checkpointtsv
Regexfilter
Regexparser
road
RoadMetadata
HashJoinLeft
RHS
EstimateAlbedo
RoadSegments Geohash
CoGroup
RHStree
road
Filtertree_dist
TreeDistance
Checkpointshade
GPSlogs
Geohash
CoGroup
RHS
reco
CoPAGIS exprot
Regexparser
tsv
park
Regexfilter
park
Scrubspecies
Geohash
Regexfilter
Regexparser
tree
TreeMetadata
HashJoinLeft
RHS
FailureTraps
M
R
52Monday, 17 December 12
CoPA: log events
53Monday, 17 December 12
‣ addr: 115 HAWTHORNE AVE‣ lat/lng: 37.446, -122.168‣ geohash: 9q9jh0‣ tree: 413 site 2‣ species: Liquidambar styraciflua‣ avg height 23 m‣ road albedo: 0.12‣ distance: 10 m‣ a short walk from my train stop ✔
0.00
0.02
0.04
0.06
0.08
0.10
0.12
0 10 20 30 40 50avg_height
dens
ity
count0100200300
Estimated Tree Height (meters)CoPA: results
54Monday, 17 December 12
PMML:predictive modeling
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Intro to Cascading
55Monday, 17 December 12
PMML model
56Monday, 17 December 12
cascading.patternexample:
1. use customer order history as the training data set
2. train a risk classifier for orders, using Random Forest
3. export model from R to PMML
4. build a Cascading app to execute the PMML model
4.1. generate a pipeline from PMML description
4.2. planner builds the flow for a topology (Hadoop)
4.3. compile app to a JAR file
5. deploy the app at scale to calculate scores
57Monday, 17 December 12
Cascading apps
risk classifierdimension: per-order
risk classifierdimension: customer 360
PMML model
analyst'slaptopdata prep
detectfraudsters
predictmodel costs
customertransactions
score new orders
trainingdata sets
batchworkloads
real-timeworkloads
anomalydetection
segmentcustomers
IMDGHadoop
partner dataDW
ETL
chargebacks,etc.
CustomerDB
velocitymetrics
cascading.pattern
58Monday, 17 December 12
1:“orders” data set...train/test in R...exported as PMML
59Monday, 17 December 12
## train a RandomForest model
f <- as.formula("as.factor(label) ~ .")fit <- randomForest(f, data_train, ntree=50)
## test the model on the holdout test set
print(fit$importance)print(fit)
predicted <- predict(fit, data)data$predicted <- predictedconfuse <- table(pred = predicted, true = data[,1])print(confuse)
## export predicted labels to TSV
write.table(data, file=paste(dat_folder, "sample.tsv", sep="/"), quote=FALSE, sep="\t", row.names=FALSE)
## export RF model to PMML
saveXML(pmml(fit), file=paste(dat_folder, "sample.rf.xml", sep="/"))
R modeling
60Monday, 17 December 12
MeanDecreaseGinivar0 0.6591701var1 33.8625179var2 8.0290020
OOB estimate of error rate: 13.83%Confusion matrix: 0 1 class.error0 28 5 0.15151521 8 53 0.1311475
[1] "./data/sample.rf.xml"
R output
61Monday, 17 December 12
2:Cascading app takes PMML as a parameter...
62Monday, 17 December 12
PMML model<?xml version="1.0"?><PMML version="4.0" xmlns="http://www.dmg.org/PMML-4_0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.dmg.org/PMML-4_0 http://www.dmg.org/v4-0/pmml-4-0.xsd"> <Header copyright="Copyright (c)2012 Concurrent, Inc." description="Random Forest Tree Model"> <Extension name="user" value="ceteri" extender="Rattle/PMML"/> <Application name="Rattle/PMML" version="1.2.30"/> <Timestamp>2012-10-22 19:39:28</Timestamp> </Header> <DataDictionary numberOfFields="4"> <DataField name="label" optype="categorical" dataType="string"> <Value value="0"/> <Value value="1"/> </DataField> <DataField name="var0" optype="continuous" dataType="double"/> <DataField name="var1" optype="continuous" dataType="double"/> <DataField name="var2" optype="continuous" dataType="double"/> </DataDictionary> <MiningModel modelName="randomForest_Model" functionName="classification"> <MiningSchema> <MiningField name="label" usageType="predicted"/> <MiningField name="var0" usageType="active"/> <MiningField name="var1" usageType="active"/> <MiningField name="var2" usageType="active"/> </MiningSchema> <Segmentation multipleModelMethod="majorityVote"> <Segment id="1"> <True/> <TreeModel modelName="randomForest_Model" functionName="classification" algorithmName="randomForest" splitCharacteristic="binarySplit"> <MiningSchema> <MiningField name="label" usageType="predicted"/> <MiningField name="var0" usageType="active"/> <MiningField name="var1" usageType="active"/> <MiningField name="var2" usageType="active"/> </MiningSchema>...
63Monday, 17 December 12
public class Main { public static void main( String[] args ) { String pmmlPath = args[ 0 ]; String ordersPath = args[ 1 ]; String classifyPath = args[ 2 ]; String trapPath = args[ 3 ];
Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps Tap ordersTap = new Hfs( new TextDelimited( true, "\t" ), ordersPath ); Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath ); Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
// define a "Classifier" model from PMML to evaluate the orders Classifier classifier = new Classifier( pmmlPath ); Pipe classifyPipe = new Each( new Pipe( "classify" ), classifier.getFields(), new ClassifierFunction( new Fields( "score" ), classifier ), Fields.ALL );
// connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName( "classify" ) .addSource( classifyPipe, ordersTap ) .addTrap( classifyPipe, trapTap ) .addSink( classifyPipe, classifyTap );
// write a DOT file and run the flow Flow classifyFlow = flowConnector.connect( flowDef ); classifyFlow.writeDOT( "dot/classify.dot" ); classifyFlow.complete(); }}
Cascading app
64Monday, 17 December 12
3:app deployed on a cluster to score customers at scale...
65Monday, 17 December 12
elastic-mapreduce --create --name "RF" \ --jar s3n://temp.cascading.org/pattern/pattern.jar \ --arg s3n://temp.cascading.org/pattern/sample.rf.xml \ --arg s3n://temp.cascading.org/pattern/sample.tsv \ --arg s3n://temp.cascading.org/pattern/out/classify \ --arg s3n://temp.cascading.org/pattern/out/trap
deploy to cloud
aws.amazon.com/elasticmapreduce/
66Monday, 17 December 12
resultsbash-3.2$ head output/classify/part-00000 label" var0" var1" var2" order_id" predicted"score1" 0" 1" 0" 6f8e1014" 1" 10" 0" 0" 1" 6f8ea22e" 0" 01" 0" 1" 0" 6f8ea435" 1" 10" 0" 0" 1" 6f8ea5e1" 0" 01" 0" 1" 0" 6f8ea785" 1" 11" 0" 1" 0" 6f8ea91e" 1" 10" 1" 0" 0" 6f8eaaba" 0" 01" 0" 1" 0" 6f8eac54" 1" 10" 1" 1" 0" 6f8eade3" 1" 1
67Monday, 17 December 12
blog, code/wiki/gists, JARs, community, DevOps products:
cascading.org
github.org/Cascading
conjars.org
meetup.com/cascading
goo.gl/KQtUL
concurrentinc.com
drill-down
[email protected]@pacoid
Copyright @2012, Concurrent, Inc.
68Monday, 17 December 12