chicago hadoop users group: enterprise data workflows
TRANSCRIPT
Paco NathanConcurrent, Inc.San Francisco, CA@pacoid
Copyright @2013, Concurrent, Inc.
“Enterprise Data Workflows with Cascading”
zest.to/event63_772013-02-12
1Tuesday, 12 February 13You may not have heard about us much, but you use our API in lots of places:your bank, your airline, your hospital, your mobile device, your social network, etc.
Unstructured Data meets Enterprise Scale
• an example considered
• system integration: tearing down silos
• code samples
• data science perspectives: how we got here
• the workflow abstraction: many aspects of an app
• developer, analyst, scientist
• summary, references
2Tuesday, 12 February 13Background: I’m a data scientist, an engineering director, spent the past decade building/leading Data teams which created large-scale apps.
This talk is about using Cascading and related DSLs to build Enterprise Data Workflows.Our emphasis is on leveraging the workflow abstraction for system integration, for mitigating complexity, and for producing simple, robust apps at scale.We’ll show a little something for the developers, the analysts, and the scientists in the room.
an example considered
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
3Tuesday, 12 February 13Let’s consider the matter of handling Big Data from the perspective of building and maintaining Enterprise apps…
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Enterprise Data Workflows
an example…
4Tuesday, 12 February 13Apache Hadoop rarely ever gets used in isolation
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Enterprise Data Workflows
an example… the front end
5Tuesday, 12 February 13LOB use cases drive the demand for Big Data apps
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Enterprise Data Workflows
an example… the back office
6Tuesday, 12 February 13Enterprise organizations have seriously ginormous investments in existing back office practices: people, infrastructure, processes
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Enterprise Data Workflows
an example… the heavy lifting!
7Tuesday, 12 February 13“Main Street” firms have invested in Hadoop to address Big Data needs, off-setting their rising costs for Enterprise licenses from SAS, Teradata, etc.
system integration:tearing down silos
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
8Tuesday, 12 February 13the process of building Enterprise apps is largely about system integration and business process, meeting in the middle
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – definitions
• a pattern language for Enterprise Data Workflows
• simple to build, easy to test, robust in production
• design principles ⟹ ensure best practices at scale
9Tuesday, 12 February 13A pattern language ensures that best practices are followed by an implementation.
In this case, parallelization of deterministic query plans for reliable, Enterprise-scale workflows on Hadoop, etc.
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – usage
• Java API, Scala DSL Scalding, Clojure DSL Cascalog
• ASL 2 license, GitHub src, http://conjars.org
• 5+ yrs production use, multiple Enterprise verticals
10Tuesday, 12 February 13More than 5 year history of large-scale Enterprise deploymentsDSLs in Scala, Clojure, Jython, JRuby, Groovy, etc.Maven repo for third-party contribs
quotes…
“Cascading gives Java developers the ability to build Big Data applications on Hadoop using their existing skillset … Management can really go out and build a team around folks that are already very experienced with Java. Switching over to this is really a very short exercise.”
CIO, Thor Olavsrud2012-06-06cio.com/article/707782/Ease_Big_Data_Hiring_Pain_With_Cascading
“Masks the complexity of MapReduce, simplifies the programming, and speeds you on your journey toward actionable analytics … A vast improvement over native MapReduce functions or Pig UDFs.”
2012 BOSSIE Awards, James Borck2012-09-18infoworld.com/slideshow/65089
11Tuesday, 12 February 13Industry analysts are picking up on the staffing costs related to Hadoop, “no free lunch”
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – deployments
• case studies: Twitter, Etsy, Climate Corp, Nokia, Factual,Williams-Sonoma, uSwitch, Airbnb, Square, Harvard, etc.
• partners: Amazon AWS, Microsoft Azure, Hortonworks, MapR, EMC, SpringSource, Cloudera
• OSS frameworks built atop by: Twitter, Etsy, eBay, Climate Corp, uSwitch, YieldBot, etc.
• use cases: ETL, anti-fraud, advertising, recommenders, retail pricing, eCRM, marketing funnel, search analytics, genomics, climatology, etc.
12Tuesday, 12 February 13Several published case studies about Cascading, Scalding, Cascalog, etc.Wide range of use cases.
Significant investment by Twitter, Etsy, and other firms for OSS based on Cascading.Partnerships with all Hadoop vendors.
(Williams-Sonoma, Neiman Marcus)
concurrentinc.com/case-studies/upstream/ upstreamsoftware.com/blog/bid/86333/
(revenue team, publisher analytics)
concurrentinc.com/case-studies/twitter/ github.com/twitter/scalding/wiki
(infrastructure team)
concurrentinc.com/case-studies/airbnb/ gigaom.com/data/meet-the-combo-behind-etsy-airbnb-and-climate-corp-hadoop-jobs/
case studies…
13Tuesday, 12 February 13Several customers using Cascading / Scalding / Cascalog have published case studies.Here are a few.
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – taps
• taps integrate other data frameworks, as tuple streams
• these are “plumbing” endpoints in the pattern language
• sources (inputs), sinks (outputs), traps (exceptions)
• where schema and provenance get determined
• text delimited, JDBC, Memcached, HBase, Cassandra, MongoDB, etc.
• data serialization: Avro, Thrift, Kryo, JSON, etc.
• extend in ~4 lines of Java
14Tuesday, 12 February 13Speaking of system integration,taps provide the simplest approach for integrating different frameworks.
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – topologies
• topologies execute workflows on clusters
• flow planner is much like a compiler for queries
• abstraction layers reduce training costs
• Hadoop (MapReduce jobs)
• local mode (dev/test or special config)
• in-memory data grids (real-time)
• flow planner can be extended to support other topologies
• blend flows from different topologies into one app
15Tuesday, 12 February 13Another kind of integration involves apps which run partly on a Hadoop cluster, and partly somewhere else.
example topologies…
16Tuesday, 12 February 13Here are some examples of topologies for distributed computing -- Apache Hadoop being the first supported by Cascading, followed by local mode, and now a tuple space (IMDG) flow planner in the works.
Several other widely used platforms would also be likely suspects for Cascading flow planners.
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – ANSI SQL
• ANSI SQL parser/optimizer atop Cascading flow planner
• JDBC driver to integrate into existing tools and app servers
• surface a relational catalog over a collection of unstructured data
• launch a SQL shell prompt to run queries
• enable the analysts without retraining on Hadoop, etc.
• transparency for Support,Ops, Finance, et al.
• combine SQL flows withScalding, Cascalog, etc.
• based on collab with Optiq –industry-proven code base
• keep the DBAs happy, andgo home a hero!
17Tuesday, 12 February 13Quite a number of projects have started out with Hadoop, then grafted a SQL-like syntax onto it. Somewhere.
We started out with a query planner used in Enterprise, then partnered with Optiq -- the team behind an Enterprise-proven code base for an ANSI SQL parser/optimizer.
In the sense that Splunk handles “machine data”, this SQL implementation provides “machine code”, as the lingua franca of Enterprise system integration.
abstraction RDBMS JVM Cluster
parser ANSI SQLcompliant parser
ANSI SQLcompliant parser
optimizer logical plan, optimized based on stats
logical plan, optimized based on stats
planner physical plan API “plumbing”
machinedata
query history,table stats
app history, tuple stats
topology b-trees, etc. heterogenous, distributed: Hadoop, in-memory, etc.
visualization ERD flow diagram
schema table schema tuple schema
catalog relational catalog tap usage DB
provenance (manual audit) data setproducers/consumers
how to query…
18Tuesday, 12 February 13When you peel back the onion skin on a SQL query, each of the abstraction layers used in an RDBMS has an analogue (or better) in the context of Enterprise Data Workflows running on JVM clusters
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – machine learning
• export predictive models as PMML
• Cascading compiles to JVM classes for parallelization
• migrate workloads: SAS, Microstrategy,Teradata, etc.
• great OSS tools: R, Weka, KNIME, RapidMiner, etc.
• run multiple models in parallel as customer experiments
• Random Forest, Logistic Regression,GLM, Assoc Rules, Decision Trees,K-Means, Hierarchical Clustering, etc.
• 2 lines of code required for integration
• integrate with other libraries: Matrix API, Algebird, etc.
• combine with other flows intoone app: Java for ETL, Scala for data services,SQL for reporting, etc.
19Tuesday, 12 February 13PMML has been around for a while, and export is supported by virtually every analytics platform, covering a wide variety of predictive modeling algorithms.
Cascading reads PMML, building out workflows under the hood which run efficiently in parallel.
Much cheaper than buying a SAS license for your 2000-node Hadoop cluster ;)
Five companies are collaborating on this open source project, https://github.com/Cascading/cascading.pattern
PMML support…
20Tuesday, 12 February 13Here are just a few of the tools that people use to create predictive models for export as PMML
Hadoop Cluster
sourcetap
sourcetap sink
taptraptap
customer profile DBsCustomer
Prefs
logslogs
Logs
DataWorkflow
Cache
Customers
Support
WebApp
Reporting
Analytics Cubes
sinktap
Modeling PMML
Cascading – test-driven development
• assert patterns (regex) on the tuple streams
• trap edge cases as “data exceptions”
• adjust assert levels, like log4j levels
• TDD at scale:
1. start from raw inputs in the flow graph
2. define stream assertionsfor each stage of transforms
3. verify exceptions, code to eliminate them
4. rinse, lather, repeat…
5. when impl is complete,app has full test coverage
• TDD follows from Cascalog’scomposable subqueries
• redirect traps in production to Ops, QA, Support, Audit, etc.
21Tuesday, 12 February 13TDD is not usually high on the list when people start discussing Big Data apps.
Chris Wensel introduced into Cascading the notion of a “data exception”, and how to set stream assertion levels as part of the business logic of an application.
Moreover, the Cascalog language by Nathan Marz, Sam Ritchie, et al., arguably uses TDD as its methodology, in the transition from ad-hoc queries as logic predicates, then composing those predicates into large-scale apps.
Closely related to “functional relational programming” paradigm from Moseley & Marks 2006http://goo.gl/SKspn
Cascading – API design principles
• specify what is required, not how it must be achieved
• provide the “glue” for system integration
• no surprises
• same JAR, any scale
• plan far ahead (before consuming cluster resources)
• fail the same way twice
22Tuesday, 12 February 13Overview of the design principles embodied by Cascading as a pattern language…
Some aspects (Cascalog in particular) are closely related to “FRP” from Moseley/Marks 2006
code samples:Word Count
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
23Tuesday, 12 February 13Let’s make this real, show some code…
the ubiquitous word count
definition: count how often each word appears in a collection of text documents
this simple program provides an excellent test case for parallel processing, since it illustrates:
‣ requires a minimal amount of code
‣ demonstrates use of both symbolic and numeric values
‣ shows a dependency graph of tuples as an abstraction
‣ is not many steps away from useful search indexing
‣ serves as a “Hello World” for Hadoop apps
any distributed computing framework which can run Word Count efficiently in parallel at scale can handle much larger and more interesting compute problems
24Tuesday, 12 February 13
word count – pseudocode
void map (String doc_id, String text):
for each word w in segment(text):
emit(w, "1");
void reduce (String word, Iterator partial_counts):
int count = 0;
for each pc in partial_counts:
count += Int(pc);
emit(word, String(count));
25Tuesday, 12 February 13
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
1 map 1 reduce18 lines code
gist.github.com/3900702
word count – flow diagram
cascading.org/category/impatient
26Tuesday, 12 February 13
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
word count – Cascading app
String docPath = args[ 0 ];String wcPath = args[ 1 ];Properties properties = new Properties();AppProps.setApplicationJarClass( properties, Main.class );HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink tapsTap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
// specify a regex to split "document" text lines into token streamFields token = new Fields( "token" );Fields text = new Fields( "text" );RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );// only returns "token"Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );// determine the word countsPipe wcPipe = new Pipe( "wc", docPipe );wcPipe = new GroupBy( wcPipe, token );wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flowFlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );// write a DOT file and run the flowFlow wcFlow = flowConnector.connect( flowDef );wcFlow.writeDOT( "dot/wc.dot" );wcFlow.complete();
27Tuesday, 12 February 13
map
reduceEvery('wc')[Count[decl:'count']]
Hfs['TextDelimited[[UNKNOWN]->['token', 'count']]']['output/wc']']
GroupBy('wc')[by:['token']]
Each('token')[RegexSplitGenerator[decl:'token'][args:1]]
Hfs['TextDelimited[['doc_id', 'text']->[ALL]]']['data/rain.txt']']
[head]
[tail]
[{2}:'token', 'count'][{1}:'token']
[{2}:'doc_id', 'text'][{2}:'doc_id', 'text']
wc[{1}:'token'][{1}:'token']
[{2}:'token', 'count'][{2}:'token', 'count']
[{1}:'token'][{1}:'token']
word count – flow planDocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
28Tuesday, 12 February 13
import com.twitter.scalding._ class WordCount(args : Args) extends Job(args) { Tsv(args("doc"), ('doc_id, 'text), skipHeader = true) .read .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") } .groupBy('token) { _.size('count) } .write(Tsv(args("wc"), writeHeader = true))}
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
word count – Scalding / Scala
29Tuesday, 12 February 13
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
github.com/twitter/scalding/wiki
‣ extends the Scala collections API, distributed lists become “pipes” backed by Cascading
‣ code is compact, easy to understand – very close to conceptual flow diagram
‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.
‣ large-scale, complex problems can be handled in just a few lines of code
‣ significant investments by Twitter, Etsy, eBay, etc., in this open source project
‣ extensive libraries are available for linear algebra, abstract algebra, machine learning – e.g., “Matrix API”
‣ several large-scale apps in production deployments
‣ IMHO, especially great for data services at scale
word count – Scalding / Scala
30Tuesday, 12 February 13Using a functional programming language to build flows works even better than trying to represent functional programming constructs within Java…
(ns impatient.core (:use [cascalog.api] [cascalog.more-taps :only (hfs-delimited)]) (:require [clojure.string :as s] [cascalog.ops :as c]) (:gen-class))
(defmapcatop split [line] "reads in a line of string and splits it by regex" (s/split line #"[\[\]\\\(\),.)\s]+"))
(defn -main [in out & args] (?<- (hfs-delimited out) [?word ?count] ((hfs-delimited in :skip-header? true) _ ?line) (split ?line :> ?word) (c/count ?count)))
; Paul Lam; github.com/Quantisan/Impatient
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
word count – Cascalog / Clojure
31Tuesday, 12 February 13
DocumentCollection
WordCount
TokenizeGroupBytoken Count
R
M
github.com/nathanmarz/cascalog/wiki
‣ implements Datalog in Clojure, with predicates backed by Cascading
‣ a truly declarative language – whereas Scalding lacks that aspect of functional programming
‣ run ad-hoc queries from the Clojure REPL, approx. 10:1 code reduction compared with SQL
‣ composable subqueries, for test-driven development (TDD) at scale
‣ fault-tolerant workflows which are simple to follow
‣ same framework used from discovery through to production apps
‣ FRP mitigates the s/w engineering costs of Accidental Complexity
‣ focus on the process of structuring data; not un/structured
‣ Leiningen build: simple, no surprises, in Clojure itself
‣ has a learning curve, limited number of Clojure developers
‣ aggregators are the magic, those take effort to learn
word count – Cascalog / Clojure
32Tuesday, 12 February 13
data science perspectives:how we got here
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
33Tuesday, 12 February 13Let’s examine an evolution of Data Science practice, subsequent to the 1997 Q3 inflection point which enabled huge ecommerce successes, and commercialized Big Data
RDBMS
Stakeholder
SQL Queryresult sets
Excel pivot tablesPowerPoint slide decks
Web App
Customers
transactions
Product
strategy
Engineering
requirements
BIAnalysts
optimizedcode
circa 1996: pre- inflection point
34Tuesday, 12 February 13Ah, teh olde days - Perl and C++ for CGI :)
Feedback loops shown in red represent data innovations at the time…
Characterized by slow, manual processes:data modeling / business intelligence; “throw it over the wall”…this thinking led to impossible silos
RDBMS
SQL Queryresult sets
recommenders+
classifiersWeb Apps
customertransactions
AlgorithmicModeling
Logs
eventhistory
aggregation
dashboards
Product
EngineeringUX
Stakeholder Customers
DW ETL
Middleware
servletsmodels
circa 2001: post- big ecommerce successes
35Tuesday, 12 February 13Q3 1997: Greg Linden @ Amazon, Randy Shoup @ eBay -- independent teams arrived at the same conclusion: parallelize workloads onto clusters of commodity servers (Intel/Linux) to scale-out horizontally.Google and Inktomi (YHOO Search) were working along the same lines.
MapReduce grew directly out of this effort. LinkedIn, Facebook, Twitter, Apple, etc., follow.Algorithmic modeling, which leveraged machine data, allowed for Big Data to become monetized. REALLY monetized :)
Leo Breiman wrote an excellent paper in 2001, “Two Cultures”, chronicling this evolution and the sea change from data modeling (silos, manual process) to algorithmic modeling (machine data for automation/optimization)
MapReduce came from work in 2002. Google is now three generations beyond that -- while the Global 1000 struggles to rationalize Hadoop practices.
Google gets upset when people try to “open the kimono”; however, Twitter is in SF where that’s a national pastime :) To get an idea of what powers Google internally, check the open source projects: Scalding, Matrix API, Algebird, etc.
Workflow
RDBMS
near timebatch
services
transactions,content
socialinteractions
Web Apps,Mobile, etc.History
Data Products Customers
RDBMS
LogEvents
In-Memory Data Grid
Hadoop, etc.
Cluster Scheduler
Prod
Eng
DW
Use Cases Across Topologies
s/wdev
datascience
discovery+
modeling
Planner
Ops
dashboardmetrics
businessprocess
optimizedcapacitytaps
DataScientist
App Dev
Ops
DomainExpert
introducedcapability
existingSDLC
circa 2013: clusters everywhere
36Tuesday, 12 February 13Here’s what our more savvy customers are using for architecture and process today: traditional SDLC, but also Data Science inter-disciplinary teams.Also, machine data (app history) driving planners and schedulers for advanced multi-tenant cluster computing fabric.
Not unlike a practice at LLL, where 4x more data gets collected about the machine than about the experiment.
asymptotically…
• smarter, more robust clusters
• increased leverage of machine datafor automation and optimization
• DSLs focused on scalability, testability, reducing s/w engineering complexity
• increased use of “machine code”, who writes SQL directly?
• workflows incorporating more “moving parts”
• less about “bigness” of data,more about complexity of process
• greater instrumentation ⟹
even more machine data, increased feedback
Workflow
ClusterScheduler
Planner/Optimizer
Cluster
DSL
AppHistory
37Tuesday, 12 February 13Enterprise Data Workflows: more about “complex” process than about “big” data
by Leo Breiman
Statistical Modeling: The Two CulturesStatistical Science, 2001
bit.ly/eUTh9L
also check out RStudio:rstudio.org/rpubs.com/
references…
38Tuesday, 12 February 13for a really great discussion about the fundamentals of Data Science and process for algorithmic modeling (analyzing the 1997 inflection point), refer back to Breiman 2001.
by DJ Patil
Data JujitsuO’Reilly, 2012
amazon.com/dp/B008HMN5BE
Building Data Science TeamsO’Reilly, 2011
amazon.com/dp/B005O4U3ZE
references…
39Tuesday, 12 February 13in terms of building data products, see DJ Patil's mini-books on O'Reilly:Building Data Science TeamsData Jujitsu
the workflow abstraction:many aspects of an app
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
40Tuesday, 12 February 13The workflow abstraction helps make Hadoop accessible to a broader audience of developers.
Let’s take a look at how organizations can leverage it in other important ways…
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
the workflow abstraction
Tuple Flows, Pipes, Taps, Filters, Joins, Traps, etc.
…in other words, “plumbing” as a pattern language for managing the complexity of Big Data in Enterprise apps on many levels
41Tuesday, 12 February 13The workflow abstraction,a pattern language for building robust, scalable Enterprise apps,which works on many levels across an organization…
rather than arguing SQL vs. NoSQL…
this kind of work focuses on the process of structuring data
which must occur long before work on large-scale joins, visualizations, predictive models, etc.
so the process of structuring data is what we examine here:
i.e., how to build workflows for Big Data
thank you Dr. Codd
“A relational model of data for large shared data banks” dl.acm.org/citation.cfm?id=362685
42Tuesday, 12 February 13instead, in Data Science work we must focus on *the process of structuring data*that must happen before the large-scale joins, predictive models, visualizations, etc.the process of structuring data is what i will show herehow to build workflows from Big Datathank you Dr. Codd
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – abstraction layer
• Cascading initially grew from interaction with the Nutch project, before Hadoop had a name; API author Chris Wensel recognized that MapReduce would be too complex for substantial work in an Enterprise context
• 5+ years later, Enterprise app deployments on Hadoop are limited by staffing issues: difficulty of retraining staff, scarcity of Hadoop experts
• the pattern language provides a structured method for solving large, complex design problems where the syntax of the language promotes use of best practices – which addresses staffing issues
43Tuesday, 12 February 13First and foremost, the workflow represents an abstraction layerto mitigate the complexity and costs of coding large apps directly in MapReduce.
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – literate collaboration
• provides an intuitive visual representation for apps: flow diagrams
• flow diagrams are quite valuable for cross-team collaboration
• this approach leverages literate programming methodology, especially in DSLs written in functional programming languages
• example: nearly 1:1 correspondence between function calls and flow diagram elements in Scalding
• example: expert developers on cascading-users email list use flow diagrams to help troubleshoot issues remotely
44Tuesday, 12 February 13Formally speaking, the pattern language in Cascading gets leveraged as a visual representation used for literate programming.
Several good examples exist, but the phenomenon of different developers troubleshooting a program together over the “cascading-users” email list is most telling -- the expert developers generally ask a novice to provide a flow diagram first
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – business process
• imposes a separation of concerns between the capture of business process requirements, and the implementation details (Hadoop, etc.)
• workflow orchestration evokes the notion of business process management for Enterprise apps (think BPM/BPEL)
• Cascalog leverages Datalog features to make business processexecutable: “specify what you require, not how to achieve it”
45Tuesday, 12 February 13Business Stakeholder POV:business process management for workflow orchestration (think BPM/BPEL)
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – data architect
• represents a physical plan for large-scale data flow management
• tap schemes and tuple streams determine the relevant schema
• a producer/consumer graph of tap identifier URIs provides a view of data provenance
• cluster utilization vs. producer/consumer graph surfaces ROI for Hadoop-based data products
46Tuesday, 12 February 13Data Architect POV:a physical plan for large-scale data flow management
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – system integration
• Cascading apps incorporate much more than just Hadoop jobs
• integration of other heterogenous data frameworks (taps) and compute platforms (topologies)
• integration of other paradigms via DSLs, ANSI SQL, PMML, etc.
• ultimately reduced/encapsulated as a single JAR file per app
47Tuesday, 12 February 13Systems Integrator POV:system integration of heterogenous data sources and compute platforms
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – data scientist
• represents a directed, acyclic graph (DAG) on which we can apply some useful math…
• Amdahl’s Law, etc., to quantify the extent of parallelization
• query optimizer can leverage app history (machine data) to selectalternative algorithms based on the shape of the data
• predictive modeling to estimate expected run times of a given app for a given size and shape of input data
48Tuesday, 12 February 13Data Scientist POV:a directed, acyclic graph (DAG) on which we can apply Amdahl's Law, etc.
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – developer
• physical plan for a query avoids non-deterministic behavior – expensive when troubleshooting
• otherwise, edge cases can become nightmares on a large cluster
• “plan far ahead”: potential problems can be inferred at compile time or at flow planner stage…long before large, expensive resources start getting consumed…or worse, before the wrong results get propagated downstream
49Tuesday, 12 February 13As an engineering manager who has tasked data scientists and Hadoop developers to build large-scale apps, a typical issue is that everything works great on a laptop with a small set of data, but then the app falls over on the staging cluster -- causing staff to spend weeks debugging edge cases.
Non-deterministic behavior of Big Data frameworks creates enormous costs at scale.
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – operations
• “same JAR, any scale” allows for continuous integration practices –no need to change code or recompile a JAR
• flow diagrams annotated with metrics allow systems engineers toidentify performance bottlenecks, model utilization rates, perform capacity planning, determine ROI on cluster infrastructure, etc.
50Tuesday, 12 February 13Systems Engineer POV:workflow as a JAR file, has passed CI, available in a Maven repo
metrics on the DAG for Ops utilization analysis, capacity planning, ROI on infrastructure
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
workflow – transparency
• fully connected context for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.
• this practice is in stark contrast to Big Data frameworks where developers cross multiple language boundaries to troubleshoot large-scale apps
• again, complexity is more the issue than “bigness” … and lackof transparency is what makes complexity so expensive in Big Data apps
51Tuesday, 12 February 13An entire app compiles into a single JAR: fully connected context for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.
for the analyston your shopping list…
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
52Tuesday, 12 February 13For the analysts in the audience -- also useful for Ops, Customer Support, Finance, etc.
ANSI SQL – integration
Analysts: using SQL data warehouses (MPP) today to run large-scale queries, but need to start leveraging Hadoop
Developers: Cascading users building Enterprise data workflows today on Hadoop
capabilities
• ANSI standard SQL parser and optimizer built on top of Cascading
• relational catalog view into large-scale unstructured data
• SQL shell to test and submit queries into Hadoop
• JDBC driver to integrate into existing tools and application servers
benefits
• use standard SQL to run queries over unstructured data on Hadoop
• integrate SQL queries into Enterprise data workflows
• don’t have to learn new syntax or modify queries
53Tuesday, 12 February 13Cascading worked with another team which had substantial experience building ANSI SQL compliant parser/optimizer, ready for integration with popular Enterprise relational frameworks
ANSI SQL – CSV data in local file system
54Tuesday, 12 February 13The test database for MySQL is available for download from https://launchpad.net/test-db/
Here we have a bunch o’ CSV flat files in a directory in the local file system.
ANSI SQL – simple DDL overlay
55Tuesday, 12 February 13Use the “lingual” command line interface to overlay DDL to describe the expected table schema.
ANSI SQL – shell prompt, catalog
56Tuesday, 12 February 13Use the “lingual” SQL shell prompt to run SQL queries interactively, show catalog, etc.
ANSI SQL – queries
57Tuesday, 12 February 13Here’s an example SQL query on that “employee” test database from MySQL.
ANSI SQL – JDBC driver
public void run() throws ClassNotFoundException, SQLException { Class.forName( "cascading.lingual.jdbc.Driver" ); Connection connection = DriverManager.getConnection( "jdbc:lingual:local;schemas=src/main/resources/data/example" ); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery( "select *\n" + "from \"EXAMPLE\".\"SALES_FACT_1997\" as s\n" + "join \"EXAMPLE\".\"EMPLOYEE\" as e\n" + "on e.\"EMPID\" = s.\"CUST_ID\"" ); while( resultSet.next() ) { int n = resultSet.getMetaData().getColumnCount(); StringBuilder builder = new StringBuilder(); for( int i = 1; i <= n; i++ ) { builder.append( ( i > 1 ? "; " : "" ) + resultSet.getMetaData().getColumnLabel( i ) + "=" + resultSet.getObject( i ) ); }
System.out.println( builder ); } resultSet.close(); statement.close(); connection.close(); }
58Tuesday, 12 February 13Note that in this example the schema for the DDL has been derived directly from the CSV files.
In other words, point the JDBC connection at a directory of flat files and query as if they were already loaded into SQL.
ANSI SQL – JDBC result set
$ gradle clean jar$ hadoop jar build/libs/lingual-examples–1.0.0-wip-dev.jar CUST_ID=100; PROD_ID=10; EMPID=100; NAME=BillCUST_ID=150; PROD_ID=20; EMPID=150; NAME=Sebastian
Caveat: if you absolutely positively must have sub-second SQL query response for Pb-scale data on a 1000+ node cluster… Good luck with that!! Call the MPP vendors.
This ANSI SQL library is primarily intended for batch workflows – high throughput, not low-latency –for many under-represented use cases in Enterprise IT.
59Tuesday, 12 February 13success
for the scientiston your shopping list…
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
60Tuesday, 12 February 13For the data scientists in the audience…
example model integration
• use customer order history as the training data set
• train a risk classifier for orders, using Random Forest
• export model from R to PMML
• build a Cascading app to execute the PMML model
• generate a pipeline from PMML description
• planner builds flow for a topology (Hadoop)
• compile app to a JAR file
• deploy the app at scale to calculate scores
Cascading apps
risk classifierdimension: per-order
risk classifierdimension: customer 360
PMML model
analyst'slaptopdata prep
detectfraudsters
predictmodel costs
customertransactions
score new orders
trainingdata sets
batchworkloads
real-timeworkloads
anomalydetection
segmentcustomers
IMDGHadoop
partner dataDW
ETL
chargebacks,etc.
CustomerDB
velocitymetrics
cascading.org/pattern
61Tuesday, 12 February 13
example model integration
Cascading apps
risk classifierdimension: per-order
risk classifierdimension: customer 360
PMML model
analyst'slaptopdata prep
detectfraudsters
predictmodel costs
customertransactions
score new orders
trainingdata sets
batchworkloads
real-timeworkloads
anomalydetection
segmentcustomers
IMDGHadoop
partner dataDW
ETL
chargebacks,etc.
CustomerDB
velocitymetrics
62Tuesday, 12 February 13
## train a RandomForest model
f <- as.formula("as.factor(label) ~ .")fit <- randomForest(f, data_train, ntree=50)
## test the model on the holdout test set
print(fit$importance)print(fit)
predicted <- predict(fit, data)data$predicted <- predictedconfuse <- table(pred = predicted, true = data[,1])print(confuse)
## export predicted labels to TSV
write.table(data, file=paste(dat_folder, "sample.tsv", sep="/"), quote=FALSE, sep="\t", row.names=FALSE)
## export RF model to PMML
saveXML(pmml(fit), file=paste(dat_folder, "sample.rf.xml", sep="/"))
model creation in R
63Tuesday, 12 February 13
<?xml version="1.0"?><PMML version="4.0" xmlns="http://www.dmg.org/PMML-4_0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.dmg.org/PMML-4_0 http://www.dmg.org/v4-0/pmml-4-0.xsd"> <Header copyright="Copyright (c)2012 Concurrent, Inc." description="Random Forest Tree Model"> <Extension name="user" value="ceteri" extender="Rattle/PMML"/> <Application name="Rattle/PMML" version="1.2.30"/> <Timestamp>2012-10-22 19:39:28</Timestamp> </Header> <DataDictionary numberOfFields="4"> <DataField name="label" optype="categorical" dataType="string"> <Value value="0"/> <Value value="1"/> </DataField> <DataField name="var0" optype="continuous" dataType="double"/> <DataField name="var1" optype="continuous" dataType="double"/> <DataField name="var2" optype="continuous" dataType="double"/> </DataDictionary> <MiningModel modelName="randomForest_Model" functionName="classification"> <MiningSchema> <MiningField name="label" usageType="predicted"/> <MiningField name="var0" usageType="active"/> <MiningField name="var1" usageType="active"/> <MiningField name="var2" usageType="active"/> </MiningSchema> <Segmentation multipleModelMethod="majorityVote"> <Segment id="1"> <True/> <TreeModel modelName="randomForest_Model" functionName="classification" algorithmName="randomForest" splitCharacteristic="binarySplit"> <MiningSchema> <MiningField name="label" usageType="predicted"/> <MiningField name="var0" usageType="active"/> <MiningField name="var1" usageType="active"/> <MiningField name="var2" usageType="active"/> </MiningSchema>...
model exported as PMML
64Tuesday, 12 February 13
CustomerOrders
Classify ScoredOrders
GroupBytoken
Count
PMMLModel
M R
FailureTraps
Assert
ConfusionMatrix
model run at scale as a Cascading app
cascading.org/pattern
65Tuesday, 12 February 13
public class Main { public static void main( String[] args ) { String pmmlPath = args[ 0 ]; String ordersPath = args[ 1 ]; String classifyPath = args[ 2 ]; String trapPath = args[ 3 ];
Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps Tap ordersTap = new Hfs( new TextDelimited( true, "\t" ), ordersPath ); Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath ); Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
// define a "Classifier" model from PMML to evaluate the orders ClassifierFunction classFunc = new ClassifierFunction( new Fields( "score" ), pmmlPath ); Pipe classifyPipe = new Each( new Pipe( "classify" ), classFunc.getInputFields(), classFunc, Fields.ALL );
// connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName( "classify" ) .addSource( classifyPipe, ordersTap ) .addTrap( classifyPipe, trapTap ) .addSink( classifyPipe, classifyTap );
// write a DOT file and run the flow Flow classifyFlow = flowConnector.connect( flowDef ); classifyFlow.writeDOT( "dot/classify.dot" ); classifyFlow.complete(); }}
model run at scale as a Cascading app
66Tuesday, 12 February 13
# replace with your S3 bucketBUCKET=temp.cascading.org/patternSINK=outPMML=sample.rf.xmlDATA=sample.tsv
# clear previous output (required by Apache Hadoop)s3cmd del -r s3://$BUCKET/$SINK
# load built JAR + input datas3cmd put build/libs/pattern.jar s3://$BUCKET/s3cmd put data/$PMML s3://$BUCKET/s3cmd put data/$DATA s3://$BUCKET/
# launch cluster and runelastic-mapreduce --create --name "RF" \ --debug --enable-debugging --log-uri s3n://$BUCKET/logs \ --jar s3n://$BUCKET/pattern.jar \ --arg s3n://$BUCKET/$PMML \ --arg s3n://$BUCKET/$DATA \ --arg s3n://$BUCKET/$SINK/classify \ --arg s3n://$BUCKET/$SINK/trap
app deployed in the AWS cloud
67Tuesday, 12 February 13
bash-3.2$ head output/classify/part-00000 label"var0"var1"var2"order_id" predicted" score1" 0" 1" 0" 6f8e1014" 1" 10" 0" 0" 1" 6f8ea22e" 0" 01" 0" 1" 0" 6f8ea435" 1" 10" 0" 0" 1" 6f8ea5e1" 0" 01" 0" 1" 0" 6f8ea785" 1" 11" 0" 1" 0" 6f8ea91e" 1" 10" 1" 0" 0" 6f8eaaba" 0" 01" 0" 1" 0" 6f8eac54" 1" 10" 1" 1" 0" 6f8eade3" 1" 1
results
cascading.org/pattern
68Tuesday, 12 February 13
for the developeron your shopping list…
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
69Tuesday, 12 February 13For the J2EE / Scala / Clojure developers in the audience…
M
gps
Countgps_count
R
Geohash
gpslogs
Maxrecent_visit
M
road
RoadMetadata
Join EstimateAlbedo Geohash
Regexparse-road
RoadSegments
R
M
tree
GISexport
Regexparse-gis
src
Scrubspecies
Geohash
Regexparse-tree
tree
TreeMetadata
Join
FailureTraps
Estimateheight
M
M
Join Calculatedistance
shade
Filterheight
Summoment
REstimatetraffic
Rroad
Filterdistance
M M
Filtersum_moment
Join
R reco
github.com/Cascading/CoPA
Cascalog app for a recommender system
gist.github.com/ceteri/4641263
70Tuesday, 12 February 13A conceptual flow diagram for the entire batch workflow
City of Palo Alto recently began an Open Data initiative to give the local community greater visibility into how city government operates
intended to encourage students, entrepreneurs, local organizations, etc., to build new apps which contribute to the public good
paloalto.opendata.junar.com/dashboards/7576/geographic-information/
• trees, parks
• creek levels
• roads
• bike paths
• zoning
• library visits
• utility usage
• street sweeping
71Tuesday, 12 February 13The City of Palo Alto has recently begun to support Open Data to give the local community greater visibility into how their city government functions.
This effort is intended to encourage students, entrepreneurs, local organizations, etc., to build new apps which contribute to the public good.
http://paloalto.opendata.junar.com/dashboards/7576/geographic-information/
process:
1. unstructured data about municipal infrastructure(GIS data: trees, roads, parks)
✚
2. unstructured data about where people like to walk(smartphone GPS logs)
✚
3. a wee bit o’ curated metadata
⇒4. personalized recommendations:
M
gps
Countgps_count
R
Geohash
gpslogs
Maxrecent_visit
M
road
RoadMetadata
Join EstimateAlbedo Geohash
Regexparse-road
RoadSegments
R
M
tree
GISexport
Regexparse-gis
src
Scrubspecies
Geohash
Regexparse-tree
tree
TreeMetadata
Join
FailureTraps
Estimateheight
M
M
Join Calculatedistance
shade
Filterheight
Summoment
REstimatetraffic
Rroad
Filterdistance
M M
Filtersum_moment
Join
R reco
“Find a shady spot on a summer day in which to walk near downtown Palo Alto. While on a long conference call. Sippin’ a latte or enjoying some fro-yo.”
72Tuesday, 12 February 13We merge unstructured geo data about municipal infrastructure(GIS data: trees, roads, parks)+unstructured data about where people like to walk(smartphone GPS logs)+a little metadata (curated)=>personalized recommendations:
"Find a shady spot on a summer day in which to walk near downtown Palo Alto. While on a long conference call. Sippin’ a latte or enjoying some fro-yo."
"Tree: 29 site 2 at 203 ADDISON AV, on ADDISON AV 44 from pl"," Private: -1 Tree ID: 29 Street_Name: ADDISON AV Situs Number: 203 Tree Site: 2 Species: Celtis australis Source: davey tree Protected: Designated: Heritage: Appraised Value: Hardscape: None Identifier: 40 Active Numeric: 1 Location Feature ID: 13872 Provisional: Install Date: ","37.4409634615283,-122.15648458861,0.0 ","Point""Wilkie Way from West Meadow Drive to Victoria Place"," Sequence: 20 Street_Name: Wilkie Way From Street PMMS: West Meadow Drive To Street PMMS: Victoria Place Street ID: 598 (Wilkie Wy, Palo Alto) From Street ID PMMS: 689 To Street ID PMMS: 567 Year Constructed: 1950 Traffic Count: 596 Traffic Index: residential local Traffic Class: local residential Traffic Date: 08/24/90 Paving Length: 208 Paving Width: 40 Paving Area: 8320 Surface Type: asphalt concrete Surface Thickness: 2.0 Base Type Pvmt: crusher run base Base Thickness: 6.0 Soil Class: 2 Soil Value: 15 Curb Type: Curb Thickness: Gutter Width: 36.0 Book: 22 Page: 1 District Number: 18 Land Use PMMS: 1 Overlay Year: 1990 Overlay Thickness: 1.5 Base Failure Year: 1990 Base Failure Thickness: 6 Surface Treatment Year: Surface Treatment Type: Alligator Severity: none Alligator Extent: 0 Block Severity: none Block Extent: 0 Longitude and Transverse Severity: none Longitude and Transverse Extent: 0 Ravelling Severity: none Ravelling Extent: 0 Ridability Severity: none Trench Severity: none Trench Extent: 0 Rutting Severity: none Rutting Extent: 0 Road Performance: UL (Urban Local) Bike Lane: 0 Bus Route: 0 Truck Route: 0 Remediation: Deduct Value: 100 Priority: Pavement Condition: excellent Street Cut Fee per SqFt: 10.00 Source Date: 6/10/2009 User Modified By: mnicols Identifier System: 21410 ","-122.1249640794,37.4155803115645,0.0 -122.124661859039,37.4154224594993,0.0 -122.124587720719,37.4153758330704,0.0 -122.12451895942,37.4153242300888,0.0 -122.124456098457,37.4152680432944,0.0 -122.124399616238,37.4152077003122,0.0 -122.124374937753,37.4151774433318,0.0 ","Line"
GIS export – raw, unstructured data
73Tuesday, 12 February 13here’s what we have to work with -- raw GIS export as CSV, with plenty o’ errors too for good measure
this illustrates the quintessence of “unstructured data”
Alligator Severity!Rutting Extent!
ad-hoc queries ⇒ ⇒ logical predicates ⇒ Cascading flows
Leiningen REPL prompt
74Tuesday, 12 February 13First we load `lein repl` to get an interactive prompt for Clojure…bring Cascalog libraries into Clojure…define functions to use…and execute queries
then we convert the queries into composable, logical propositions
curate valuable metadata
75Tuesday, 12 February 13since we can find species and geolocation for each tree,
let’s add some metadata to infer other valuable data results, e.g., tree height
based on Wikipedia.org, Calflora.org, USDA.gov, etc.
Cascalog – an example
(defn get-trees [src trap tree_meta] "subquery to parse/filter the tree data" (<- [?blurb ?tree_id ?situs ?tree_site ?species ?wikipedia ?calflora ?avg_height ?tree_lat ?tree_lng ?tree_alt ?geohash ] (src ?blurb ?misc ?geo ?kind) (re-matches #"^\s+Private.*Tree ID.*" ?misc) (parse-tree ?misc :> _ ?priv ?tree_id ?situs ?tree_site ?raw_species) ((c/comp s/trim s/lower-case) ?raw_species :> ?species) (tree_meta ?species ?wikipedia ?calflora ?min_height ?max_height) (avg ?min_height ?max_height :> ?avg_height) (geo-tree ?geo :> _ ?tree_lat ?tree_lng ?tree_alt) (read-string ?tree_lat :> ?lat) (read-string ?tree_lng :> ?lng) (geohash ?lat ?lng :> ?geohash) (:trap (hfs-textline trap)) ))
76Tuesday, 12 February 13Let's use Cascalog to begin our process of structuring that data
since the GIS export is vaguely in CSV format, here's a simple way to clean up the data
referring back to DJ Patil’s “Data Jujitsu”, that clean up usually accounts for 80% of project costs
M
tree
GISexport
Regexparse-gis
src
Scrubspecies
Geohash
Regexparse-tree
tree
TreeMetadata
Join
FailureTraps
Estimateheight
M
GIS export ⇒ “trees” data product
?blurb! ! Tree: 412 site 1 at 115 HAWTHORNE AV, on HAWTHORNE AV 22 from pl?tree_id!" 412?situs" " 115?tree_site" 1?species"" liquidambar styraciflua?wikipedia" http://en.wikipedia.org/wiki/Liquidambar_styraciflua?calflora" http://calflora.org/cgi-bin/species_query.cgi?where-calrecnum=8598?avg_height"27.5?tree_lat" 37.446001565119?tree_lng" -122.167713417554?tree_alt" 0.0?geohash"" 9q9jh0
77Tuesday, 12 February 13Great, now we have a data product about trees in Palo Alto, which has been enriched by our processBTW, those geolocation fields are especially important.
Also, here’s a conceptual flow diagram, showing a directed, acyclic graph (DAG) of data taps, tuple streams, operations, joins, assertions, aggregations, etc.
add to that some road data…
• traffic class (arterial, truck route, residential, etc.)
• traffic counts distribution
• surface type (asphalt, cement; age)
from which we derive estimators for noise, reflection, etc.
78Tuesday, 12 February 13analysis and visualizations from RStudio:
* frequency of traffic classes * density plot of traffic counts
recommender – data objects
each road in the GIS export is listed as a block between two cross roads, and each may have multiple road segments to represent turns:
" -122.161776959558,37.4518836690781,0.0 " -122.161390381489,37.4516410983794,0.0 " -122.160786011735,37.4512589903357,0.0 " -122.160531178368,37.4510977281699,0.0
( lat0, lng0, alt0 )
( lat1, lng1, alt1 )
( lat2, lng2, alt2 )
( lat3, lng3, alt3 )
NB: segments in the raw GIS have the orderof geo coordinates scrambled: (lng, lat, alt)
79Tuesday, 12 February 13Each road is listed in the GIS export as a block between two cross roads, and each may have multiple road segments to represent turns
9q9jh0
geohash with 6-digit resolution
approximates a 5-block square
centered lat: 37.445, lng: -122.162
recommender – spatial indexing
80Tuesday, 12 February 13We use “geohash” codes for “cheap and dirty” geospatial indexing suited for parallel processing (Hadoop)
much more effective methods exist; however, this is simple to show
6-digit resolution on a geohash generates approximately a 5-block square
9q9jh0
calculate a sum of moments for tree height × distance from road segment, as an estimator for shade:
∑( h·d )
also calculate estimators for traffic frequency and noise
recommender – estimators
81Tuesday, 12 February 13Calculate a sum of moments for tree height × distance from center;approximate, but pretty good
also calculate estimators for traffic frequency and noise
recommender – personalization via GPS tracks
82Tuesday, 12 February 13Here’s a Splunk screen showing GPS tracks log data from smartphones
recommenders combine multiple signals, generally via weighted averages, to rank personalized results:
•GPS of person ∩ road segment
• frequency and recency of visit
• traffic class and rate
• road albedo (sunlight reflection)
• tree shade estimator
adjusting the mix allows for furtherpersonalization at the end use
recommender – strategy
83Tuesday, 12 February 13One approach to building commercial recommender systems is to take a vector of different preference metrics, combine in to a single sortable value, then rank the results before making personalized suggestions.
The resulting data in the "reco" output set produces exactly that.
‣ addr: 115 HAWTHORNE AVE‣ lat/lng: 37.446, -122.168‣ geohash: 9q9jh0‣ tree: 413 site 2‣ species: Liquidambar styraciflua‣ est. height: 23 m‣ shade metric: 4.363‣ traffic: local residential, light traffic‣ recent visit: 1972376952532‣ a short walk from my train stop ✔
recommender – results
84Tuesday, 12 February 13One of top recommendations for me is about two blocks from my train stop, where a couple of really big American Sweetgum trees provide ample shadeon a residential street with not much traffic
summary, references
Scrubtoken
DocumentCollection
Tokenize
WordCount
GroupBytoken
Count
Stop WordList
Regextoken
HashJoinLeft
RHS
M
R
Enterprise Data Workflows
85Tuesday, 12 February 13
Cascading – summary
‣ leverages a workflow abstraction for Enterprise apps
‣ provides a pattern language for system integration
‣ utilizes existing practices for JVM-based clusters
‣ addresses staffing bottlenecks due to Hadoop adoption
‣ manages complexity as data continues to scale massively
‣ reduces costs, while servicing risk-averse “conservatism”
86Tuesday, 12 February 13In summary…
by Paco Nathan
Enterprise Data Workflowswith Cascading
O’Reilly, 2013amazon.com/dp/1449358721
Santa Clara, Feb 28, 1:30pmstrataconf.com/strata2013
references…
87Tuesday, 12 February 13Some of this material comes from an upcoming O’Reilly book:“Enterprise Data Workflows with Cascading”
Also, come to Strata conference! I’ll be presenting related material at:http://strataconf.com/strata2013/public/schedule/detail/27073Santa Clara, Feb 28, 1:30pm
blog, community, code/wiki/gists, maven repo, products:
cascading.org
zest.to/group11
github.com/Cascading
conjars.org
goo.gl/KQtUL
concurrentinc.com
join us, we are hiring!
drill-down…
Copyright @2013, Concurrent, Inc.
88Tuesday, 12 February 13Links to our open source projects, developer community, etc…
contact me @pacoidhttp://concurrentinc.com/(we're hiring too!)