chicago hadoop users group: enterprise data workflows

88
Paco Nathan Concurrent, Inc. San Francisco, CA @pacoid Copyright @2013, Concurrent, Inc. “Enterprise Data Workflows with Cascading” zest.to/event63_77 2013-02-12 1 Tuesday, 12 February 13 You may not have heard about us much, but you use our API in lots of places: your bank, your airline, your hospital, your mobile device, your social network, etc.

Upload: paco-nathan

Post on 15-Jul-2015

30.868 views

Category:

Technology


6 download

TRANSCRIPT

Page 1: Chicago Hadoop Users Group: Enterprise Data Workflows

Paco NathanConcurrent, Inc.San Francisco, CA@pacoid

Copyright @2013, Concurrent, Inc.

“Enterprise Data Workflows with Cascading”

zest.to/event63_772013-02-12

1Tuesday, 12 February 13You may not have heard about us much, but you use our API in lots of places:your bank, your airline, your hospital, your mobile device, your social network, etc.

Page 2: Chicago Hadoop Users Group: Enterprise Data Workflows

Unstructured Data meets Enterprise Scale

• an example considered

• system integration: tearing down silos

• code samples

• data science perspectives: how we got here

• the workflow abstraction: many aspects of an app

• developer, analyst, scientist

• summary, references

2Tuesday, 12 February 13Background: I’m a data scientist, an engineering director, spent the past decade building/leading Data teams which created large-scale apps.

This talk is about using Cascading and related DSLs to build Enterprise Data Workflows.Our emphasis is on leveraging the workflow abstraction for system integration, for mitigating complexity, and for producing simple, robust apps at scale.We’ll show a little something for the developers, the analysts, and the scientists in the room.

Page 3: Chicago Hadoop Users Group: Enterprise Data Workflows

an example considered

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

3Tuesday, 12 February 13Let’s consider the matter of handling Big Data from the perspective of building and maintaining Enterprise apps…

Page 4: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Enterprise Data Workflows

an example…

4Tuesday, 12 February 13Apache Hadoop rarely ever gets used in isolation

Page 5: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Enterprise Data Workflows

an example… the front end

5Tuesday, 12 February 13LOB use cases drive the demand for Big Data apps

Page 6: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Enterprise Data Workflows

an example… the back office

6Tuesday, 12 February 13Enterprise organizations have seriously ginormous investments in existing back office practices: people, infrastructure, processes

Page 7: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Enterprise Data Workflows

an example… the heavy lifting!

7Tuesday, 12 February 13“Main Street” firms have invested in Hadoop to address Big Data needs, off-setting their rising costs for Enterprise licenses from SAS, Teradata, etc.

Page 8: Chicago Hadoop Users Group: Enterprise Data Workflows

system integration:tearing down silos

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

8Tuesday, 12 February 13the process of building Enterprise apps is largely about system integration and business process, meeting in the middle

Page 9: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – definitions

• a pattern language for Enterprise Data Workflows

• simple to build, easy to test, robust in production

• design principles ⟹ ensure best practices at scale

9Tuesday, 12 February 13A pattern language ensures that best practices are followed by an implementation.

In this case, parallelization of deterministic query plans for reliable, Enterprise-scale workflows on Hadoop, etc.

Page 10: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – usage

• Java API, Scala DSL Scalding, Clojure DSL Cascalog

• ASL 2 license, GitHub src, http://conjars.org

• 5+ yrs production use, multiple Enterprise verticals

10Tuesday, 12 February 13More than 5 year history of large-scale Enterprise deploymentsDSLs in Scala, Clojure, Jython, JRuby, Groovy, etc.Maven repo for third-party contribs

Page 11: Chicago Hadoop Users Group: Enterprise Data Workflows

quotes…

“Cascading gives Java developers the ability to build Big Data applications on Hadoop using their existing skillset … Management can really go out and build a team around folks that are already very experienced with Java. Switching over to this is really a very short exercise.”

CIO, Thor Olavsrud2012-06-06cio.com/article/707782/Ease_Big_Data_Hiring_Pain_With_Cascading

“Masks the complexity of MapReduce, simplifies the programming, and speeds you on your journey toward actionable analytics … A vast improvement over native MapReduce functions or Pig UDFs.”

2012 BOSSIE Awards, James Borck2012-09-18infoworld.com/slideshow/65089

11Tuesday, 12 February 13Industry analysts are picking up on the staffing costs related to Hadoop, “no free lunch”

Page 12: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – deployments

• case studies: Twitter, Etsy, Climate Corp, Nokia, Factual,Williams-Sonoma, uSwitch, Airbnb, Square, Harvard, etc.

• partners: Amazon AWS, Microsoft Azure, Hortonworks, MapR, EMC, SpringSource, Cloudera

• OSS frameworks built atop by: Twitter, Etsy, eBay, Climate Corp, uSwitch, YieldBot, etc.

• use cases: ETL, anti-fraud, advertising, recommenders, retail pricing, eCRM, marketing funnel, search analytics, genomics, climatology, etc.

12Tuesday, 12 February 13Several published case studies about Cascading, Scalding, Cascalog, etc.Wide range of use cases.

Significant investment by Twitter, Etsy, and other firms for OSS based on Cascading.Partnerships with all Hadoop vendors.

Page 13: Chicago Hadoop Users Group: Enterprise Data Workflows

(Williams-Sonoma, Neiman Marcus)

concurrentinc.com/case-studies/upstream/ upstreamsoftware.com/blog/bid/86333/

(revenue team, publisher analytics)

concurrentinc.com/case-studies/twitter/ github.com/twitter/scalding/wiki

(infrastructure team)

concurrentinc.com/case-studies/airbnb/ gigaom.com/data/meet-the-combo-behind-etsy-airbnb-and-climate-corp-hadoop-jobs/

case studies…

13Tuesday, 12 February 13Several customers using Cascading / Scalding / Cascalog have published case studies.Here are a few.

Page 14: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – taps

• taps integrate other data frameworks, as tuple streams

• these are “plumbing” endpoints in the pattern language

• sources (inputs), sinks (outputs), traps (exceptions)

• where schema and provenance get determined

• text delimited, JDBC, Memcached, HBase, Cassandra, MongoDB, etc.

• data serialization: Avro, Thrift, Kryo, JSON, etc.

• extend in ~4 lines of Java

14Tuesday, 12 February 13Speaking of system integration,taps provide the simplest approach for integrating different frameworks.

Page 15: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – topologies

• topologies execute workflows on clusters

• flow planner is much like a compiler for queries

• abstraction layers reduce training costs

• Hadoop (MapReduce jobs)

• local mode (dev/test or special config)

• in-memory data grids (real-time)

• flow planner can be extended to support other topologies

• blend flows from different topologies into one app

15Tuesday, 12 February 13Another kind of integration involves apps which run partly on a Hadoop cluster, and partly somewhere else.

Page 16: Chicago Hadoop Users Group: Enterprise Data Workflows

example topologies…

16Tuesday, 12 February 13Here are some examples of topologies for distributed computing -- Apache Hadoop being the first supported by Cascading, followed by local mode, and now a tuple space (IMDG) flow planner in the works.

Several other widely used platforms would also be likely suspects for Cascading flow planners.

Page 17: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – ANSI SQL

• ANSI SQL parser/optimizer atop Cascading flow planner

• JDBC driver to integrate into existing tools and app servers

• surface a relational catalog over a collection of unstructured data

• launch a SQL shell prompt to run queries

• enable the analysts without retraining on Hadoop, etc.

• transparency for Support,Ops, Finance, et al.

• combine SQL flows withScalding, Cascalog, etc.

• based on collab with Optiq –industry-proven code base

• keep the DBAs happy, andgo home a hero!

17Tuesday, 12 February 13Quite a number of projects have started out with Hadoop, then grafted a SQL-like syntax onto it. Somewhere.

We started out with a query planner used in Enterprise, then partnered with Optiq -- the team behind an Enterprise-proven code base for an ANSI SQL parser/optimizer.

In the sense that Splunk handles “machine data”, this SQL implementation provides “machine code”, as the lingua franca of Enterprise system integration.

Page 18: Chicago Hadoop Users Group: Enterprise Data Workflows

abstraction RDBMS JVM Cluster

parser ANSI SQLcompliant parser

ANSI SQLcompliant parser

optimizer logical plan, optimized based on stats

logical plan, optimized based on stats

planner physical plan API “plumbing”

machinedata

query history,table stats

app history, tuple stats

topology b-trees, etc. heterogenous, distributed: Hadoop, in-memory, etc.

visualization ERD flow diagram

schema table schema tuple schema

catalog relational catalog tap usage DB

provenance (manual audit) data setproducers/consumers

how to query…

18Tuesday, 12 February 13When you peel back the onion skin on a SQL query, each of the abstraction layers used in an RDBMS has an analogue (or better) in the context of Enterprise Data Workflows running on JVM clusters

Page 19: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – machine learning

• export predictive models as PMML

• Cascading compiles to JVM classes for parallelization

• migrate workloads: SAS, Microstrategy,Teradata, etc.

• great OSS tools: R, Weka, KNIME, RapidMiner, etc.

• run multiple models in parallel as customer experiments

• Random Forest, Logistic Regression,GLM, Assoc Rules, Decision Trees,K-Means, Hierarchical Clustering, etc.

• 2 lines of code required for integration

• integrate with other libraries: Matrix API, Algebird, etc.

• combine with other flows intoone app: Java for ETL, Scala for data services,SQL for reporting, etc.

19Tuesday, 12 February 13PMML has been around for a while, and export is supported by virtually every analytics platform, covering a wide variety of predictive modeling algorithms.

Cascading reads PMML, building out workflows under the hood which run efficiently in parallel.

Much cheaper than buying a SAS license for your 2000-node Hadoop cluster ;)

Five companies are collaborating on this open source project, https://github.com/Cascading/cascading.pattern

Page 20: Chicago Hadoop Users Group: Enterprise Data Workflows

PMML support…

20Tuesday, 12 February 13Here are just a few of the tools that people use to create predictive models for export as PMML

Page 21: Chicago Hadoop Users Group: Enterprise Data Workflows

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Cascading – test-driven development

• assert patterns (regex) on the tuple streams

• trap edge cases as “data exceptions”

• adjust assert levels, like log4j levels

• TDD at scale:

1. start from raw inputs in the flow graph

2. define stream assertionsfor each stage of transforms

3. verify exceptions, code to eliminate them

4. rinse, lather, repeat…

5. when impl is complete,app has full test coverage

• TDD follows from Cascalog’scomposable subqueries

• redirect traps in production to Ops, QA, Support, Audit, etc.

21Tuesday, 12 February 13TDD is not usually high on the list when people start discussing Big Data apps.

Chris Wensel introduced into Cascading the notion of a “data exception”, and how to set stream assertion levels as part of the business logic of an application.

Moreover, the Cascalog language by Nathan Marz, Sam Ritchie, et al., arguably uses TDD as its methodology, in the transition from ad-hoc queries as logic predicates, then composing those predicates into large-scale apps.

Page 22: Chicago Hadoop Users Group: Enterprise Data Workflows

Closely related to “functional relational programming” paradigm from Moseley & Marks 2006http://goo.gl/SKspn

Cascading – API design principles

• specify what is required, not how it must be achieved

• provide the “glue” for system integration

• no surprises

• same JAR, any scale

• plan far ahead (before consuming cluster resources)

• fail the same way twice

22Tuesday, 12 February 13Overview of the design principles embodied by Cascading as a pattern language…

Some aspects (Cascalog in particular) are closely related to “FRP” from Moseley/Marks 2006

Page 23: Chicago Hadoop Users Group: Enterprise Data Workflows

code samples:Word Count

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

23Tuesday, 12 February 13Let’s make this real, show some code…

Page 24: Chicago Hadoop Users Group: Enterprise Data Workflows

the ubiquitous word count

definition: count how often each word appears in a collection of text documents

this simple program provides an excellent test case for parallel processing, since it illustrates:

‣ requires a minimal amount of code

‣ demonstrates use of both symbolic and numeric values

‣ shows a dependency graph of tuples as an abstraction

‣ is not many steps away from useful search indexing

‣ serves as a “Hello World” for Hadoop apps

any distributed computing framework which can run Word Count efficiently in parallel at scale can handle much larger and more interesting compute problems

24Tuesday, 12 February 13

Page 25: Chicago Hadoop Users Group: Enterprise Data Workflows

word count – pseudocode

void map (String doc_id, String text):

for each word w in segment(text):

emit(w, "1");

void reduce (String word, Iterator partial_counts):

int count = 0;

for each pc in partial_counts:

count += Int(pc);

emit(word, String(count));

25Tuesday, 12 February 13

Page 26: Chicago Hadoop Users Group: Enterprise Data Workflows

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

1 map 1 reduce18 lines code

gist.github.com/3900702

word count – flow diagram

cascading.org/category/impatient

26Tuesday, 12 February 13

Page 27: Chicago Hadoop Users Group: Enterprise Data Workflows

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

word count – Cascading app

String docPath = args[ 0 ];String wcPath = args[ 1 ];Properties properties = new Properties();AppProps.setApplicationJarClass( properties, Main.class );HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

// create source and sink tapsTap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );

// specify a regex to split "document" text lines into token streamFields token = new Fields( "token" );Fields text = new Fields( "text" );RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );// only returns "token"Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );// determine the word countsPipe wcPipe = new Pipe( "wc", docPipe );wcPipe = new GroupBy( wcPipe, token );wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );

// connect the taps, pipes, etc., into a flowFlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );// write a DOT file and run the flowFlow wcFlow = flowConnector.connect( flowDef );wcFlow.writeDOT( "dot/wc.dot" );wcFlow.complete();

27Tuesday, 12 February 13

Page 28: Chicago Hadoop Users Group: Enterprise Data Workflows

map

reduceEvery('wc')[Count[decl:'count']]

Hfs['TextDelimited[[UNKNOWN]->['token', 'count']]']['output/wc']']

GroupBy('wc')[by:['token']]

Each('token')[RegexSplitGenerator[decl:'token'][args:1]]

Hfs['TextDelimited[['doc_id', 'text']->[ALL]]']['data/rain.txt']']

[head]

[tail]

[{2}:'token', 'count'][{1}:'token']

[{2}:'doc_id', 'text'][{2}:'doc_id', 'text']

wc[{1}:'token'][{1}:'token']

[{2}:'token', 'count'][{2}:'token', 'count']

[{1}:'token'][{1}:'token']

word count – flow planDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

28Tuesday, 12 February 13

Page 29: Chicago Hadoop Users Group: Enterprise Data Workflows

import com.twitter.scalding._ class WordCount(args : Args) extends Job(args) { Tsv(args("doc"), ('doc_id, 'text), skipHeader = true) .read .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") } .groupBy('token) { _.size('count) } .write(Tsv(args("wc"), writeHeader = true))}

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

word count – Scalding / Scala

29Tuesday, 12 February 13

Page 30: Chicago Hadoop Users Group: Enterprise Data Workflows

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

github.com/twitter/scalding/wiki

‣ extends the Scala collections API, distributed lists become “pipes” backed by Cascading

‣ code is compact, easy to understand – very close to conceptual flow diagram

‣ functional programming is great for expressingcomplex workflows in MapReduce, etc.

‣ large-scale, complex problems can be handled in just a few lines of code

‣ significant investments by Twitter, Etsy, eBay, etc., in this open source project

‣ extensive libraries are available for linear algebra, abstract algebra, machine learning – e.g., “Matrix API”

‣ several large-scale apps in production deployments

‣ IMHO, especially great for data services at scale

word count – Scalding / Scala

30Tuesday, 12 February 13Using a functional programming language to build flows works even better than trying to represent functional programming constructs within Java…

Page 31: Chicago Hadoop Users Group: Enterprise Data Workflows

(ns impatient.core  (:use [cascalog.api]        [cascalog.more-taps :only (hfs-delimited)])  (:require [clojure.string :as s]            [cascalog.ops :as c])  (:gen-class))

(defmapcatop split [line]  "reads in a line of string and splits it by regex"  (s/split line #"[\[\]\\\(\),.)\s]+"))

(defn -main [in out & args]  (?<- (hfs-delimited out)       [?word ?count]       ((hfs-delimited in :skip-header? true) _ ?line)       (split ?line :> ?word)       (c/count ?count)))

; Paul Lam; github.com/Quantisan/Impatient

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

word count – Cascalog / Clojure

31Tuesday, 12 February 13

Page 32: Chicago Hadoop Users Group: Enterprise Data Workflows

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

github.com/nathanmarz/cascalog/wiki

‣ implements Datalog in Clojure, with predicates backed by Cascading

‣ a truly declarative language – whereas Scalding lacks that aspect of functional programming

‣ run ad-hoc queries from the Clojure REPL, approx. 10:1 code reduction compared with SQL

‣ composable subqueries, for test-driven development (TDD) at scale

‣ fault-tolerant workflows which are simple to follow

‣ same framework used from discovery through to production apps

‣ FRP mitigates the s/w engineering costs of Accidental Complexity

‣ focus on the process of structuring data; not un/structured

‣ Leiningen build: simple, no surprises, in Clojure itself

‣ has a learning curve, limited number of Clojure developers

‣ aggregators are the magic, those take effort to learn

word count – Cascalog / Clojure

32Tuesday, 12 February 13

Page 33: Chicago Hadoop Users Group: Enterprise Data Workflows

data science perspectives:how we got here

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

33Tuesday, 12 February 13Let’s examine an evolution of Data Science practice, subsequent to the 1997 Q3 inflection point which enabled huge ecommerce successes, and commercialized Big Data

Page 34: Chicago Hadoop Users Group: Enterprise Data Workflows

RDBMS

Stakeholder

SQL Queryresult sets

Excel pivot tablesPowerPoint slide decks

Web App

Customers

transactions

Product

strategy

Engineering

requirements

BIAnalysts

optimizedcode

circa 1996: pre- inflection point

34Tuesday, 12 February 13Ah, teh olde days - Perl and C++ for CGI :)

Feedback loops shown in red represent data innovations at the time…

Characterized by slow, manual processes:data modeling / business intelligence; “throw it over the wall”…this thinking led to impossible silos

Page 35: Chicago Hadoop Users Group: Enterprise Data Workflows

RDBMS

SQL Queryresult sets

recommenders+

classifiersWeb Apps

customertransactions

AlgorithmicModeling

Logs

eventhistory

aggregation

dashboards

Product

EngineeringUX

Stakeholder Customers

DW ETL

Middleware

servletsmodels

circa 2001: post- big ecommerce successes

35Tuesday, 12 February 13Q3 1997: Greg Linden @ Amazon, Randy Shoup @ eBay -- independent teams arrived at the same conclusion: parallelize workloads onto clusters of commodity servers (Intel/Linux) to scale-out horizontally.Google and Inktomi (YHOO Search) were working along the same lines.

MapReduce grew directly out of this effort. LinkedIn, Facebook, Twitter, Apple, etc., follow.Algorithmic modeling, which leveraged machine data, allowed for Big Data to become monetized. REALLY monetized :)

Leo Breiman wrote an excellent paper in 2001, “Two Cultures”, chronicling this evolution and the sea change from data modeling (silos, manual process) to algorithmic modeling (machine data for automation/optimization)

MapReduce came from work in 2002. Google is now three generations beyond that -- while the Global 1000 struggles to rationalize Hadoop practices.

Google gets upset when people try to “open the kimono”; however, Twitter is in SF where that’s a national pastime :) To get an idea of what powers Google internally, check the open source projects: Scalding, Matrix API, Algebird, etc.

Page 36: Chicago Hadoop Users Group: Enterprise Data Workflows

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

circa 2013: clusters everywhere

36Tuesday, 12 February 13Here’s what our more savvy customers are using for architecture and process today: traditional SDLC, but also Data Science inter-disciplinary teams.Also, machine data (app history) driving planners and schedulers for advanced multi-tenant cluster computing fabric.

Not unlike a practice at LLL, where 4x more data gets collected about the machine than about the experiment.

Page 37: Chicago Hadoop Users Group: Enterprise Data Workflows

asymptotically…

• smarter, more robust clusters

• increased leverage of machine datafor automation and optimization

• DSLs focused on scalability, testability, reducing s/w engineering complexity

• increased use of “machine code”, who writes SQL directly?

• workflows incorporating more “moving parts”

• less about “bigness” of data,more about complexity of process

• greater instrumentation ⟹

even more machine data, increased feedback

Workflow

ClusterScheduler

Planner/Optimizer

Cluster

DSL

AppHistory

37Tuesday, 12 February 13Enterprise Data Workflows: more about “complex” process than about “big” data

Page 38: Chicago Hadoop Users Group: Enterprise Data Workflows

by Leo Breiman

Statistical Modeling: The Two CulturesStatistical Science, 2001

bit.ly/eUTh9L

also check out RStudio:rstudio.org/rpubs.com/

references…

38Tuesday, 12 February 13for a really great discussion about the fundamentals of Data Science and process for algorithmic modeling (analyzing the 1997 inflection point), refer back to Breiman 2001.

Page 39: Chicago Hadoop Users Group: Enterprise Data Workflows

by DJ Patil

Data JujitsuO’Reilly, 2012

amazon.com/dp/B008HMN5BE

Building Data Science TeamsO’Reilly, 2011

amazon.com/dp/B005O4U3ZE

references…

39Tuesday, 12 February 13in terms of building data products, see DJ Patil's mini-books on O'Reilly:Building Data Science TeamsData Jujitsu

Page 40: Chicago Hadoop Users Group: Enterprise Data Workflows

the workflow abstraction:many aspects of an app

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

40Tuesday, 12 February 13The workflow abstraction helps make Hadoop accessible to a broader audience of developers.

Let’s take a look at how organizations can leverage it in other important ways…

Page 41: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

the workflow abstraction

Tuple Flows, Pipes, Taps, Filters, Joins, Traps, etc.

…in other words, “plumbing” as a pattern language for managing the complexity of Big Data in Enterprise apps on many levels

41Tuesday, 12 February 13The workflow abstraction,a pattern language for building robust, scalable Enterprise apps,which works on many levels across an organization…

Page 42: Chicago Hadoop Users Group: Enterprise Data Workflows

rather than arguing SQL vs. NoSQL…

this kind of work focuses on the process of structuring data

which must occur long before work on large-scale joins, visualizations, predictive models, etc.

so the process of structuring data is what we examine here:

i.e., how to build workflows for Big Data

thank you Dr. Codd

“A relational model of data for large shared data banks” dl.acm.org/citation.cfm?id=362685

42Tuesday, 12 February 13instead, in Data Science work we must focus on *the process of structuring data*that must happen before the large-scale joins, predictive models, visualizations, etc.the process of structuring data is what i will show herehow to build workflows from Big Datathank you Dr. Codd

Page 43: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – abstraction layer

• Cascading initially grew from interaction with the Nutch project, before Hadoop had a name; API author Chris Wensel recognized that MapReduce would be too complex for substantial work in an Enterprise context

• 5+ years later, Enterprise app deployments on Hadoop are limited by staffing issues: difficulty of retraining staff, scarcity of Hadoop experts

• the pattern language provides a structured method for solving large, complex design problems where the syntax of the language promotes use of best practices – which addresses staffing issues

43Tuesday, 12 February 13First and foremost, the workflow represents an abstraction layerto mitigate the complexity and costs of coding large apps directly in MapReduce.

Page 44: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – literate collaboration

• provides an intuitive visual representation for apps: flow diagrams

• flow diagrams are quite valuable for cross-team collaboration

• this approach leverages literate programming methodology, especially in DSLs written in functional programming languages

• example: nearly 1:1 correspondence between function calls and flow diagram elements in Scalding

• example: expert developers on cascading-users email list use flow diagrams to help troubleshoot issues remotely

44Tuesday, 12 February 13Formally speaking, the pattern language in Cascading gets leveraged as a visual representation used for literate programming.

Several good examples exist, but the phenomenon of different developers troubleshooting a program together over the “cascading-users” email list is most telling -- the expert developers generally ask a novice to provide a flow diagram first

Page 45: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – business process

• imposes a separation of concerns between the capture of business process requirements, and the implementation details (Hadoop, etc.)

• workflow orchestration evokes the notion of business process management for Enterprise apps (think BPM/BPEL)

• Cascalog leverages Datalog features to make business processexecutable: “specify what you require, not how to achieve it”

45Tuesday, 12 February 13Business Stakeholder POV:business process management for workflow orchestration (think BPM/BPEL)

Page 46: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – data architect

• represents a physical plan for large-scale data flow management

• tap schemes and tuple streams determine the relevant schema

• a producer/consumer graph of tap identifier URIs provides a view of data provenance

• cluster utilization vs. producer/consumer graph surfaces ROI for Hadoop-based data products

46Tuesday, 12 February 13Data Architect POV:a physical plan for large-scale data flow management

Page 47: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – system integration

• Cascading apps incorporate much more than just Hadoop jobs

• integration of other heterogenous data frameworks (taps) and compute platforms (topologies)

• integration of other paradigms via DSLs, ANSI SQL, PMML, etc.

• ultimately reduced/encapsulated as a single JAR file per app

47Tuesday, 12 February 13Systems Integrator POV:system integration of heterogenous data sources and compute platforms

Page 48: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – data scientist

• represents a directed, acyclic graph (DAG) on which we can apply some useful math…

• Amdahl’s Law, etc., to quantify the extent of parallelization

• query optimizer can leverage app history (machine data) to selectalternative algorithms based on the shape of the data

• predictive modeling to estimate expected run times of a given app for a given size and shape of input data

48Tuesday, 12 February 13Data Scientist POV:a directed, acyclic graph (DAG) on which we can apply Amdahl's Law, etc.

Page 49: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – developer

• physical plan for a query avoids non-deterministic behavior – expensive when troubleshooting

• otherwise, edge cases can become nightmares on a large cluster

• “plan far ahead”: potential problems can be inferred at compile time or at flow planner stage…long before large, expensive resources start getting consumed…or worse, before the wrong results get propagated downstream

49Tuesday, 12 February 13As an engineering manager who has tasked data scientists and Hadoop developers to build large-scale apps, a typical issue is that everything works great on a laptop with a small set of data, but then the app falls over on the staging cluster -- causing staff to spend weeks debugging edge cases.

Non-deterministic behavior of Big Data frameworks creates enormous costs at scale.

Page 50: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – operations

• “same JAR, any scale” allows for continuous integration practices –no need to change code or recompile a JAR

• flow diagrams annotated with metrics allow systems engineers toidentify performance bottlenecks, model utilization rates, perform capacity planning, determine ROI on cluster infrastructure, etc.

50Tuesday, 12 February 13Systems Engineer POV:workflow as a JAR file, has passed CI, available in a Maven repo

metrics on the DAG for Ops utilization analysis, capacity planning, ROI on infrastructure

Page 51: Chicago Hadoop Users Group: Enterprise Data Workflows

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

workflow – transparency

• fully connected context for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.

• this practice is in stark contrast to Big Data frameworks where developers cross multiple language boundaries to troubleshoot large-scale apps

• again, complexity is more the issue than “bigness” … and lackof transparency is what makes complexity so expensive in Big Data apps

51Tuesday, 12 February 13An entire app compiles into a single JAR: fully connected context for compiler optimization, exception handling, debug, config, scheduling, notifications, provenance, etc.

Page 52: Chicago Hadoop Users Group: Enterprise Data Workflows

for the analyston your shopping list…

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

52Tuesday, 12 February 13For the analysts in the audience -- also useful for Ops, Customer Support, Finance, etc.

Page 53: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – integration

Analysts: using SQL data warehouses (MPP) today to run large-scale queries, but need to start leveraging Hadoop

Developers: Cascading users building Enterprise data workflows today on Hadoop

capabilities

• ANSI standard SQL parser and optimizer built on top of Cascading

• relational catalog view into large-scale unstructured data

• SQL shell to test and submit queries into Hadoop

• JDBC driver to integrate into existing tools and application servers

benefits

• use standard SQL to run queries over unstructured data on Hadoop

• integrate SQL queries into Enterprise data workflows

• don’t have to learn new syntax or modify queries

53Tuesday, 12 February 13Cascading worked with another team which had substantial experience building ANSI SQL compliant parser/optimizer, ready for integration with popular Enterprise relational frameworks

Page 54: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – CSV data in local file system

54Tuesday, 12 February 13The test database for MySQL is available for download from https://launchpad.net/test-db/

Here we have a bunch o’ CSV flat files in a directory in the local file system.

Page 55: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – simple DDL overlay

55Tuesday, 12 February 13Use the “lingual” command line interface to overlay DDL to describe the expected table schema.

Page 56: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – shell prompt, catalog

56Tuesday, 12 February 13Use the “lingual” SQL shell prompt to run SQL queries interactively, show catalog, etc.

Page 57: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – queries

57Tuesday, 12 February 13Here’s an example SQL query on that “employee” test database from MySQL.

Page 58: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – JDBC driver

public void run() throws ClassNotFoundException, SQLException { Class.forName( "cascading.lingual.jdbc.Driver" ); Connection connection = DriverManager.getConnection( "jdbc:lingual:local;schemas=src/main/resources/data/example" ); Statement statement = connection.createStatement();  ResultSet resultSet = statement.executeQuery( "select *\n" + "from \"EXAMPLE\".\"SALES_FACT_1997\" as s\n" + "join \"EXAMPLE\".\"EMPLOYEE\" as e\n" + "on e.\"EMPID\" = s.\"CUST_ID\"" );  while( resultSet.next() ) { int n = resultSet.getMetaData().getColumnCount(); StringBuilder builder = new StringBuilder();  for( int i = 1; i <= n; i++ ) { builder.append( ( i > 1 ? "; " : "" ) + resultSet.getMetaData().getColumnLabel( i ) + "=" + resultSet.getObject( i ) ); }

System.out.println( builder ); }  resultSet.close(); statement.close(); connection.close(); }

58Tuesday, 12 February 13Note that in this example the schema for the DDL has been derived directly from the CSV files.

In other words, point the JDBC connection at a directory of flat files and query as if they were already loaded into SQL.

Page 59: Chicago Hadoop Users Group: Enterprise Data Workflows

ANSI SQL – JDBC result set

$ gradle clean jar$ hadoop jar build/libs/lingual-examples–1.0.0-wip-dev.jar CUST_ID=100; PROD_ID=10; EMPID=100; NAME=BillCUST_ID=150; PROD_ID=20; EMPID=150; NAME=Sebastian

Caveat: if you absolutely positively must have sub-second SQL query response for Pb-scale data on a 1000+ node cluster… Good luck with that!! Call the MPP vendors.

This ANSI SQL library is primarily intended for batch workflows – high throughput, not low-latency –for many under-represented use cases in Enterprise IT.

59Tuesday, 12 February 13success

Page 60: Chicago Hadoop Users Group: Enterprise Data Workflows

for the scientiston your shopping list…

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

60Tuesday, 12 February 13For the data scientists in the audience…

Page 61: Chicago Hadoop Users Group: Enterprise Data Workflows

example model integration

• use customer order history as the training data set

• train a risk classifier for orders, using Random Forest

• export model from R to PMML

• build a Cascading app to execute the PMML model

• generate a pipeline from PMML description

• planner builds flow for a topology (Hadoop)

• compile app to a JAR file

• deploy the app at scale to calculate scores

Cascading apps

risk classifierdimension: per-order

risk classifierdimension: customer 360

PMML model

analyst'slaptopdata prep

detectfraudsters

predictmodel costs

customertransactions

score new orders

trainingdata sets

batchworkloads

real-timeworkloads

anomalydetection

segmentcustomers

IMDGHadoop

partner dataDW

ETL

chargebacks,etc.

CustomerDB

velocitymetrics

cascading.org/pattern

61Tuesday, 12 February 13

Page 62: Chicago Hadoop Users Group: Enterprise Data Workflows

example model integration

Cascading apps

risk classifierdimension: per-order

risk classifierdimension: customer 360

PMML model

analyst'slaptopdata prep

detectfraudsters

predictmodel costs

customertransactions

score new orders

trainingdata sets

batchworkloads

real-timeworkloads

anomalydetection

segmentcustomers

IMDGHadoop

partner dataDW

ETL

chargebacks,etc.

CustomerDB

velocitymetrics

62Tuesday, 12 February 13

Page 63: Chicago Hadoop Users Group: Enterprise Data Workflows

## train a RandomForest model

f <- as.formula("as.factor(label) ~ .")fit <- randomForest(f, data_train, ntree=50)

## test the model on the holdout test set

print(fit$importance)print(fit)

predicted <- predict(fit, data)data$predicted <- predictedconfuse <- table(pred = predicted, true = data[,1])print(confuse)

## export predicted labels to TSV

write.table(data, file=paste(dat_folder, "sample.tsv", sep="/"), quote=FALSE, sep="\t", row.names=FALSE)

## export RF model to PMML

saveXML(pmml(fit), file=paste(dat_folder, "sample.rf.xml", sep="/"))

model creation in R

63Tuesday, 12 February 13

Page 64: Chicago Hadoop Users Group: Enterprise Data Workflows

<?xml version="1.0"?><PMML version="4.0" xmlns="http://www.dmg.org/PMML-4_0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.dmg.org/PMML-4_0 http://www.dmg.org/v4-0/pmml-4-0.xsd"> <Header copyright="Copyright (c)2012 Concurrent, Inc." description="Random Forest Tree Model">  <Extension name="user" value="ceteri" extender="Rattle/PMML"/>  <Application name="Rattle/PMML" version="1.2.30"/>  <Timestamp>2012-10-22 19:39:28</Timestamp> </Header> <DataDictionary numberOfFields="4">  <DataField name="label" optype="categorical" dataType="string">   <Value value="0"/>   <Value value="1"/>  </DataField>  <DataField name="var0" optype="continuous" dataType="double"/>  <DataField name="var1" optype="continuous" dataType="double"/>  <DataField name="var2" optype="continuous" dataType="double"/> </DataDictionary> <MiningModel modelName="randomForest_Model" functionName="classification">  <MiningSchema>   <MiningField name="label" usageType="predicted"/>   <MiningField name="var0" usageType="active"/>   <MiningField name="var1" usageType="active"/>   <MiningField name="var2" usageType="active"/>  </MiningSchema>  <Segmentation multipleModelMethod="majorityVote">   <Segment id="1">    <True/>    <TreeModel modelName="randomForest_Model" functionName="classification" algorithmName="randomForest" splitCharacteristic="binarySplit">     <MiningSchema>      <MiningField name="label" usageType="predicted"/>      <MiningField name="var0" usageType="active"/>      <MiningField name="var1" usageType="active"/>      <MiningField name="var2" usageType="active"/>     </MiningSchema>...

model exported as PMML

64Tuesday, 12 February 13

Page 65: Chicago Hadoop Users Group: Enterprise Data Workflows

CustomerOrders

Classify ScoredOrders

GroupBytoken

Count

PMMLModel

M R

FailureTraps

Assert

ConfusionMatrix

model run at scale as a Cascading app

cascading.org/pattern

65Tuesday, 12 February 13

Page 66: Chicago Hadoop Users Group: Enterprise Data Workflows

public class Main { public static void main( String[] args ) {   String pmmlPath = args[ 0 ];   String ordersPath = args[ 1 ];   String classifyPath = args[ 2 ];   String trapPath = args[ 3 ];

  Properties properties = new Properties();   AppProps.setApplicationJarClass( properties, Main.class );   HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

  // create source and sink taps   Tap ordersTap = new Hfs( new TextDelimited( true, "\t" ), ordersPath );   Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath );   Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );

  // define a "Classifier" model from PMML to evaluate the orders   ClassifierFunction classFunc = new ClassifierFunction( new Fields( "score" ), pmmlPath );   Pipe classifyPipe = new Each( new Pipe( "classify" ), classFunc.getInputFields(), classFunc, Fields.ALL );

  // connect the taps, pipes, etc., into a flow   FlowDef flowDef = FlowDef.flowDef().setName( "classify" )    .addSource( classifyPipe, ordersTap )    .addTrap( classifyPipe, trapTap )    .addSink( classifyPipe, classifyTap );

  // write a DOT file and run the flow   Flow classifyFlow = flowConnector.connect( flowDef );   classifyFlow.writeDOT( "dot/classify.dot" );   classifyFlow.complete(); }}

model run at scale as a Cascading app

66Tuesday, 12 February 13

Page 67: Chicago Hadoop Users Group: Enterprise Data Workflows

# replace with your S3 bucketBUCKET=temp.cascading.org/patternSINK=outPMML=sample.rf.xmlDATA=sample.tsv

# clear previous output (required by Apache Hadoop)s3cmd del -r s3://$BUCKET/$SINK

# load built JAR + input datas3cmd put build/libs/pattern.jar s3://$BUCKET/s3cmd put data/$PMML s3://$BUCKET/s3cmd put data/$DATA s3://$BUCKET/

# launch cluster and runelastic-mapreduce --create --name "RF" \ --debug --enable-debugging --log-uri s3n://$BUCKET/logs \ --jar s3n://$BUCKET/pattern.jar \ --arg s3n://$BUCKET/$PMML \ --arg s3n://$BUCKET/$DATA \ --arg s3n://$BUCKET/$SINK/classify \ --arg s3n://$BUCKET/$SINK/trap

app deployed in the AWS cloud

67Tuesday, 12 February 13

Page 68: Chicago Hadoop Users Group: Enterprise Data Workflows

bash-3.2$ head output/classify/part-00000 label"var0"var1"var2"order_id" predicted" score1" 0" 1" 0" 6f8e1014" 1" 10" 0" 0" 1" 6f8ea22e" 0" 01" 0" 1" 0" 6f8ea435" 1" 10" 0" 0" 1" 6f8ea5e1" 0" 01" 0" 1" 0" 6f8ea785" 1" 11" 0" 1" 0" 6f8ea91e" 1" 10" 1" 0" 0" 6f8eaaba" 0" 01" 0" 1" 0" 6f8eac54" 1" 10" 1" 1" 0" 6f8eade3" 1" 1

results

cascading.org/pattern

68Tuesday, 12 February 13

Page 69: Chicago Hadoop Users Group: Enterprise Data Workflows

for the developeron your shopping list…

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

69Tuesday, 12 February 13For the J2EE / Scala / Clojure developers in the audience…

Page 70: Chicago Hadoop Users Group: Enterprise Data Workflows

M

gps

Countgps_count

R

Geohash

gpslogs

Maxrecent_visit

M

road

RoadMetadata

Join EstimateAlbedo Geohash

Regexparse-road

RoadSegments

R

M

tree

GISexport

Regexparse-gis

src

Scrubspecies

Geohash

Regexparse-tree

tree

TreeMetadata

Join

FailureTraps

Estimateheight

M

M

Join Calculatedistance

shade

Filterheight

Summoment

REstimatetraffic

Rroad

Filterdistance

M M

Filtersum_moment

Join

R reco

github.com/Cascading/CoPA

Cascalog app for a recommender system

gist.github.com/ceteri/4641263

70Tuesday, 12 February 13A conceptual flow diagram for the entire batch workflow

Page 71: Chicago Hadoop Users Group: Enterprise Data Workflows

City of Palo Alto recently began an Open Data initiative to give the local community greater visibility into how city government operates

intended to encourage students, entrepreneurs, local organizations, etc., to build new apps which contribute to the public good

paloalto.opendata.junar.com/dashboards/7576/geographic-information/

• trees, parks

• creek levels

• roads

• bike paths

• zoning

• library visits

• utility usage

• street sweeping

71Tuesday, 12 February 13The City of Palo Alto has recently begun to support Open Data to give the local community greater visibility into how their city government functions.

This effort is intended to encourage students, entrepreneurs, local organizations, etc., to build new apps which contribute to the public good.

http://paloalto.opendata.junar.com/dashboards/7576/geographic-information/

Page 72: Chicago Hadoop Users Group: Enterprise Data Workflows

process:

1. unstructured data about municipal infrastructure(GIS data: trees, roads, parks)

2. unstructured data about where people like to walk(smartphone GPS logs)

3. a wee bit o’ curated metadata

⇒4. personalized recommendations:

M

gps

Countgps_count

R

Geohash

gpslogs

Maxrecent_visit

M

road

RoadMetadata

Join EstimateAlbedo Geohash

Regexparse-road

RoadSegments

R

M

tree

GISexport

Regexparse-gis

src

Scrubspecies

Geohash

Regexparse-tree

tree

TreeMetadata

Join

FailureTraps

Estimateheight

M

M

Join Calculatedistance

shade

Filterheight

Summoment

REstimatetraffic

Rroad

Filterdistance

M M

Filtersum_moment

Join

R reco

“Find a shady spot on a summer day in which to walk near downtown Palo Alto. While on a long conference call. Sippin’ a latte or enjoying some fro-yo.”

72Tuesday, 12 February 13We merge unstructured geo data about municipal infrastructure(GIS data: trees, roads, parks)+unstructured data about where people like to walk(smartphone GPS logs)+a little metadata (curated)=>personalized recommendations:

"Find a shady spot on a summer day in which to walk near downtown Palo Alto. While on a long conference call. Sippin’ a latte or enjoying some fro-yo."

Page 73: Chicago Hadoop Users Group: Enterprise Data Workflows

"Tree: 29 site 2 at 203 ADDISON AV, on ADDISON AV 44 from pl"," Private: -1 Tree ID: 29 Street_Name: ADDISON AV Situs Number: 203 Tree Site: 2 Species: Celtis australis Source: davey tree Protected: Designated: Heritage: Appraised Value: Hardscape: None Identifier: 40 Active Numeric: 1 Location Feature ID: 13872 Provisional: Install Date: ","37.4409634615283,-122.15648458861,0.0 ","Point""Wilkie Way from West Meadow Drive to Victoria Place"," Sequence: 20 Street_Name: Wilkie Way From Street PMMS: West Meadow Drive To Street PMMS: Victoria Place Street ID: 598 (Wilkie Wy, Palo Alto) From Street ID PMMS: 689 To Street ID PMMS: 567 Year Constructed: 1950 Traffic Count: 596 Traffic Index: residential local Traffic Class: local residential Traffic Date: 08/24/90 Paving Length: 208 Paving Width: 40 Paving Area: 8320 Surface Type: asphalt concrete Surface Thickness: 2.0 Base Type Pvmt: crusher run base Base Thickness: 6.0 Soil Class: 2 Soil Value: 15 Curb Type: Curb Thickness: Gutter Width: 36.0 Book: 22 Page: 1 District Number: 18 Land Use PMMS: 1 Overlay Year: 1990 Overlay Thickness: 1.5 Base Failure Year: 1990 Base Failure Thickness: 6 Surface Treatment Year: Surface Treatment Type: Alligator Severity: none Alligator Extent: 0 Block Severity: none Block Extent: 0 Longitude and Transverse Severity: none Longitude and Transverse Extent: 0 Ravelling Severity: none Ravelling Extent: 0 Ridability Severity: none Trench Severity: none Trench Extent: 0 Rutting Severity: none Rutting Extent: 0 Road Performance: UL (Urban Local) Bike Lane: 0 Bus Route: 0 Truck Route: 0 Remediation: Deduct Value: 100 Priority: Pavement Condition: excellent Street Cut Fee per SqFt: 10.00 Source Date: 6/10/2009 User Modified By: mnicols Identifier System: 21410 ","-122.1249640794,37.4155803115645,0.0 -122.124661859039,37.4154224594993,0.0 -122.124587720719,37.4153758330704,0.0 -122.12451895942,37.4153242300888,0.0 -122.124456098457,37.4152680432944,0.0 -122.124399616238,37.4152077003122,0.0 -122.124374937753,37.4151774433318,0.0 ","Line"

GIS export – raw, unstructured data

73Tuesday, 12 February 13here’s what we have to work with -- raw GIS export as CSV, with plenty o’ errors too for good measure

this illustrates the quintessence of “unstructured data”

Alligator Severity!Rutting Extent!

Page 74: Chicago Hadoop Users Group: Enterprise Data Workflows

ad-hoc queries ⇒ ⇒ logical predicates ⇒ Cascading flows

Leiningen REPL prompt

74Tuesday, 12 February 13First we load `lein repl` to get an interactive prompt for Clojure…bring Cascalog libraries into Clojure…define functions to use…and execute queries

then we convert the queries into composable, logical propositions

Page 75: Chicago Hadoop Users Group: Enterprise Data Workflows

curate valuable metadata

75Tuesday, 12 February 13since we can find species and geolocation for each tree,

let’s add some metadata to infer other valuable data results, e.g., tree height

based on Wikipedia.org, Calflora.org, USDA.gov, etc.

Page 76: Chicago Hadoop Users Group: Enterprise Data Workflows

Cascalog – an example

(defn get-trees [src trap tree_meta] "subquery to parse/filter the tree data" (<- [?blurb ?tree_id ?situs ?tree_site ?species ?wikipedia ?calflora ?avg_height ?tree_lat ?tree_lng ?tree_alt ?geohash ] (src ?blurb ?misc ?geo ?kind) (re-matches #"^\s+Private.*Tree ID.*" ?misc) (parse-tree ?misc :> _ ?priv ?tree_id ?situs ?tree_site ?raw_species) ((c/comp s/trim s/lower-case) ?raw_species :> ?species) (tree_meta ?species ?wikipedia ?calflora ?min_height ?max_height) (avg ?min_height ?max_height :> ?avg_height) (geo-tree ?geo :> _ ?tree_lat ?tree_lng ?tree_alt) (read-string ?tree_lat :> ?lat) (read-string ?tree_lng :> ?lng) (geohash ?lat ?lng :> ?geohash) (:trap (hfs-textline trap)) ))

76Tuesday, 12 February 13Let's use Cascalog to begin our process of structuring that data

since the GIS export is vaguely in CSV format, here's a simple way to clean up the data

referring back to DJ Patil’s “Data Jujitsu”, that clean up usually accounts for 80% of project costs

Page 77: Chicago Hadoop Users Group: Enterprise Data Workflows

M

tree

GISexport

Regexparse-gis

src

Scrubspecies

Geohash

Regexparse-tree

tree

TreeMetadata

Join

FailureTraps

Estimateheight

M

GIS export ⇒ “trees” data product

?blurb! ! Tree: 412 site 1 at 115 HAWTHORNE AV, on HAWTHORNE AV 22 from pl?tree_id!" 412?situs" " 115?tree_site" 1?species"" liquidambar styraciflua?wikipedia" http://en.wikipedia.org/wiki/Liquidambar_styraciflua?calflora" http://calflora.org/cgi-bin/species_query.cgi?where-calrecnum=8598?avg_height"27.5?tree_lat" 37.446001565119?tree_lng" -122.167713417554?tree_alt" 0.0?geohash"" 9q9jh0

77Tuesday, 12 February 13Great, now we have a data product about trees in Palo Alto, which has been enriched by our processBTW, those geolocation fields are especially important.

Also, here’s a conceptual flow diagram, showing a directed, acyclic graph (DAG) of data taps, tuple streams, operations, joins, assertions, aggregations, etc.

Page 78: Chicago Hadoop Users Group: Enterprise Data Workflows

add to that some road data…

• traffic class (arterial, truck route, residential, etc.)

• traffic counts distribution

• surface type (asphalt, cement; age)

from which we derive estimators for noise, reflection, etc.

78Tuesday, 12 February 13analysis and visualizations from RStudio:

* frequency of traffic classes * density plot of traffic counts

Page 79: Chicago Hadoop Users Group: Enterprise Data Workflows

recommender – data objects

each road in the GIS export is listed as a block between two cross roads, and each may have multiple road segments to represent turns:

" -122.161776959558,37.4518836690781,0.0 " -122.161390381489,37.4516410983794,0.0 " -122.160786011735,37.4512589903357,0.0 " -122.160531178368,37.4510977281699,0.0

( lat0, lng0, alt0 )

( lat1, lng1, alt1 )

( lat2, lng2, alt2 )

( lat3, lng3, alt3 )

NB: segments in the raw GIS have the orderof geo coordinates scrambled: (lng, lat, alt)

79Tuesday, 12 February 13Each road is listed in the GIS export as a block between two cross roads, and each may have multiple road segments to represent turns

Page 80: Chicago Hadoop Users Group: Enterprise Data Workflows

9q9jh0

geohash with 6-digit resolution

approximates a 5-block square

centered lat: 37.445, lng: -122.162

recommender – spatial indexing

80Tuesday, 12 February 13We use “geohash” codes for “cheap and dirty” geospatial indexing suited for parallel processing (Hadoop)

much more effective methods exist; however, this is simple to show

6-digit resolution on a geohash generates approximately a 5-block square

Page 81: Chicago Hadoop Users Group: Enterprise Data Workflows

9q9jh0

calculate a sum of moments for tree height × distance from road segment, as an estimator for shade:

∑( h·d )

also calculate estimators for traffic frequency and noise

recommender – estimators

81Tuesday, 12 February 13Calculate a sum of moments for tree height × distance from center;approximate, but pretty good

also calculate estimators for traffic frequency and noise

Page 82: Chicago Hadoop Users Group: Enterprise Data Workflows

recommender – personalization via GPS tracks

82Tuesday, 12 February 13Here’s a Splunk screen showing GPS tracks log data from smartphones

Page 83: Chicago Hadoop Users Group: Enterprise Data Workflows

recommenders combine multiple signals, generally via weighted averages, to rank personalized results:

•GPS of person ∩ road segment

• frequency and recency of visit

• traffic class and rate

• road albedo (sunlight reflection)

• tree shade estimator

adjusting the mix allows for furtherpersonalization at the end use

recommender – strategy

83Tuesday, 12 February 13One approach to building commercial recommender systems is to take a vector of different preference metrics, combine in to a single sortable value, then rank the results before making personalized suggestions.

The resulting data in the "reco" output set produces exactly that.

Page 84: Chicago Hadoop Users Group: Enterprise Data Workflows

‣ addr: 115 HAWTHORNE AVE‣ lat/lng: 37.446, -122.168‣ geohash: 9q9jh0‣ tree: 413 site 2‣ species: Liquidambar styraciflua‣ est. height: 23 m‣ shade metric: 4.363‣ traffic: local residential, light traffic‣ recent visit: 1972376952532‣ a short walk from my train stop ✔

recommender – results

84Tuesday, 12 February 13One of top recommendations for me is about two blocks from my train stop, where a couple of really big American Sweetgum trees provide ample shadeon a residential street with not much traffic

Page 85: Chicago Hadoop Users Group: Enterprise Data Workflows

summary, references

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Enterprise Data Workflows

85Tuesday, 12 February 13

Page 86: Chicago Hadoop Users Group: Enterprise Data Workflows

Cascading – summary

‣ leverages a workflow abstraction for Enterprise apps

‣ provides a pattern language for system integration

‣ utilizes existing practices for JVM-based clusters

‣ addresses staffing bottlenecks due to Hadoop adoption

‣ manages complexity as data continues to scale massively

‣ reduces costs, while servicing risk-averse “conservatism”

86Tuesday, 12 February 13In summary…

Page 87: Chicago Hadoop Users Group: Enterprise Data Workflows

by Paco Nathan

Enterprise Data Workflowswith Cascading

O’Reilly, 2013amazon.com/dp/1449358721

Santa Clara, Feb 28, 1:30pmstrataconf.com/strata2013

references…

87Tuesday, 12 February 13Some of this material comes from an upcoming O’Reilly book:“Enterprise Data Workflows with Cascading”

Also, come to Strata conference! I’ll be presenting related material at:http://strataconf.com/strata2013/public/schedule/detail/27073Santa Clara, Feb 28, 1:30pm

Page 88: Chicago Hadoop Users Group: Enterprise Data Workflows

blog, community, code/wiki/gists, maven repo, products:

cascading.org

zest.to/group11

github.com/Cascading

conjars.org

goo.gl/KQtUL

concurrentinc.com

join us, we are hiring!

drill-down…

Copyright @2013, Concurrent, Inc.

88Tuesday, 12 February 13Links to our open source projects, developer community, etc…

contact me @pacoidhttp://concurrentinc.com/(we're hiring too!)