boulder/denver bigdata: cluster computing with apache mesos and cascading

87
Boulder/Denver BigData, 2013-09-25: Cluster Computing with Apache Mesos and Cascading Paco Nathan @pacoid Chief Scientist, Mesosphere.io

Upload: paco-nathan

Post on 20-Aug-2015

3.664 views

Category:

Technology


2 download

TRANSCRIPT

Page 3: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Enterprise Data Workflows

middleware for Big Data applications is evolving, with commercial examples that include:

Cascading, Lingual, Pattern, etc.

Concurrent

ParAccel Big Data Analytics Platform

Actian

Anaconda supporting IPython Notebook, Pandas, Augustus, etc.

Continuum Analytics

ETL dataprep

predictivemodel

datasources

enduses

Page 4: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Anatomy of an Enterprise app

definition of a typical Enterprise workflow which crosses through multiple departments, languages, and technologies…

ETL dataprep

predictivemodel

datasources

enduses

ANSI SQL for ETL

Page 5: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Anatomy of an Enterprise app

definition of a typical Enterprise workflow which crosses through multiple departments, languages, and technologies…

ETL dataprep

predictivemodel

datasources

endusesJ2EE for business logic

Page 6: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Anatomy of an Enterprise app

definition of a typical Enterprise workflow which crosses through multiple departments, languages, and technologies…

ETL dataprep

predictivemodel

datasources

enduses

SAS for predictive models

Page 7: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Anatomy of an Enterprise app

definition of a typical Enterprise workflow which crosses through multiple departments, languages, and technologies…

ETL dataprep

predictivemodel

datasources

enduses

SAS for predictive modelsANSI SQL for ETL most of the licensing costs…

Page 8: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Anatomy of an Enterprise app

definition of a typical Enterprise workflow which crosses through multiple departments, languages, and technologies…

ETL dataprep

predictivemodel

datasources

endusesJ2EE for business logic

most of the project costs…

Page 9: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

ETL dataprep

predictivemodel

datasources

enduses

Lingual:DW → ANSI SQL

Pattern:SAS, R, etc. → PMML

business logic in Java, Clojure, Scala, etc.

sink taps for Memcached, HBase, MongoDB, etc.

source taps for Cassandra, JDBC,Splunk, etc.

Anatomy of an Enterprise app

Cascading allows multiple departments to combine their workflow components into an integrated app – one among many, typically – based on 100% open source

a compiler sees it all…one connected DAG:

• optimization

• troubleshooting

• exception handling

• notifications

cascading.org

Page 10: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

a compiler sees it all…

ETL dataprep

predictivemodel

datasources

enduses

Lingual:DW → ANSI SQL

Pattern:SAS, R, etc. → PMML

business logic in Java, Clojure, Scala, etc.

sink taps for Memcached, HBase, MongoDB, etc.

source taps for Cassandra, JDBC,Splunk, etc.

Anatomy of an Enterprise app

Cascading allows multiple departments to combine their workflow components into an integrated app – one among many, typically – based on 100% open source

FlowDef flowDef = FlowDef.flowDef() .setName( "etl" ) .addSource( "example.employee", emplTap ) .addSource( "example.sales", salesTap ) .addSink( "results", resultsTap ); SQLPlanner sqlPlanner = new SQLPlanner() .setSql( sqlStatement ); flowDef.addAssemblyPlanner( sqlPlanner );

cascading.org

Page 11: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

a compiler sees it all…

ETL dataprep

predictivemodel

datasources

enduses

Lingual:DW → ANSI SQL

Pattern:SAS, R, etc. → PMML

business logic in Java, Clojure, Scala, etc.

sink taps for Memcached, HBase, MongoDB, etc.

source taps for Cassandra, JDBC,Splunk, etc.

Anatomy of an Enterprise app

Cascading allows multiple departments to combine their workflow components into an integrated app – one among many, typically – based on 100% open source

FlowDef flowDef = FlowDef.flowDef() .setName( "classifier" ) .addSource( "input", inputTap ) .addSink( "classify", classifyTap ); PMMLPlanner pmmlPlanner = new PMMLPlanner() .setPMMLInput( new File( pmmlModel ) ) .retainOnlyActiveIncomingFields(); flowDef.addAssemblyPlanner( pmmlPlanner );

Page 12: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Cascading – functional programming

Key insight: MapReduce is based on functional programming – back to LISP in 1970s. Apache Hadoop use cases are mostly about data pipelines, which are functional in nature.

to ease staffing problems as “Main Street” Enterprise firms began to embrace Hadoop, Cascading was introduced in late 2007, as a new Java API to implement functional programming for large-scale data workflows:

• leverages JVM and Java-based tools without anyneed to create new languages

• allows programmers who have J2EE expertise to leverage the economics of Hadoop clusters

Edgar Codd alluded to this (DSLs for structuring data) in his original paper about relational model

Page 13: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Cascading – functional programming

• Twitter, eBay, LinkedIn, Nokia, YieldBot, uSwitch, etc., have invested in open source projects atop Cascading – used for their large-scale production deployments

• new case studies for Cascading apps are mostly based on domain-specific languages (DSLs) in JVM languages which emphasize functional programming:

Cascalog in Clojure (2010)Scalding in Scala (2012)

github.com/nathanmarz/cascalog/wikigithub.com/twitter/scalding/wiki

Why Adopting the Declarative Programming Practices Will Improve Your Return from TechnologyDan Woods, 2013-04-17 Forbes

forbes.com/sites/danwoods/2013/04/17/why-adopting-the-declarative-programming-practices-will-improve-your-return-from-technology/

Page 14: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Functional Programming for Big Data

WordCount with token scrubbing…

Apache Hive: 52 lines HQL + 8 lines Python (UDF)

compared to

Scalding: 18 lines Scala/Cascading

functional programming languages help reduce software engineering costs at scale, over time

Page 15: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Cascading – deployments

• case studies: Climate Corp, Twitter, Etsy, Williams-Sonoma, uSwitch, Airbnb, Nokia, YieldBot, Square, Harvard, Factual, etc.

• use cases: ETL, marketing funnel, anti-fraud, social media, retail pricing, search analytics, recommenders, eCRM, utility grids, telecom, genomics, climatology, agronomics, etc.

Page 16: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow Abstraction – pattern language

Cascading uses a “plumbing” metaphor in Java to define workflows out of familiar elements: Pipes, Taps, Tuple Flows, Filters, Joins, Traps, etc.

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

data is represented as flows of tuples

operations in the flows bring functional programming aspects into Java

A Pattern LanguageChristopher Alexander, et al.amazon.com/dp/0195019199

Page 17: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow Abstraction – literate programming

Cascading workflows generate their own visual documentation: flow diagrams

in formal terms, flow diagrams leverage a methodology called literate programming

provides intuitive, visual representations for apps –great for cross-team collaboration

Scrubtoken

DocumentCollection

Tokenize

WordCount

GroupBytoken

Count

Stop WordList

Regextoken

HashJoinLeft

RHS

M

R

Literate ProgrammingDon Knuthliterateprogramming.com

Page 18: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow Abstraction – business process

following the essence of literate programming, Cascading workflows provide statements of business process

this recalls a sense of business process management for Enterprise apps (think BPM/BPEL for Big Data)

Cascading creates a separation of concerns between business process and implementation details (Hadoop, etc.)

this is especially apparent in large-scale Cascalog apps:

“Specify what you require, not how to achieve it.”

by virtue of the pattern language, the flow planner then determines how to translate business process into efficient, parallel jobs at scale

Page 19: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

void map (String doc_id, String text):

for each word w in segment(text):

emit(w, "1");

void reduce (String word, Iterator group):

int count = 0;

for each pc in group:

count += Int(pc);

emit(word, String(count));

The Ubiquitous Word Count

Definition:

this simple program provides an excellent test case for parallel processing:

• requires a minimal amount of code

• demonstrates use of both symbolic and numeric values

• shows a dependency graph of tuples as an abstraction

• is not many steps away from useful search indexing

• serves as a “Hello World” for Hadoop apps

a distributed computing framework that runs Word Count efficiently in parallel at scale can handle much larger and more interesting compute problems

count how often each word appears in a collection of text documents

Page 20: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

1 map 1 reduce18 lines code gist.github.com/3900702

WordCount – conceptual flow diagram

cascading.org/category/impatient

Page 21: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

WordCount – Cascading app in Java

String docPath = args[ 0 ];String wcPath = args[ 1 ];Properties properties = new Properties();AppProps.setApplicationJarClass( properties, Main.class );HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

// create source and sink tapsTap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );

// specify a regex to split "document" text lines into token streamFields token = new Fields( "token" );Fields text = new Fields( "text" );RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );// only returns "token"Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );// determine the word countsPipe wcPipe = new Pipe( "wc", docPipe );wcPipe = new GroupBy( wcPipe, token );wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );

// connect the taps, pipes, etc., into a flowFlowDef flowDef = FlowDef.flowDef().setName( "wc" ) .addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );// write a DOT file and run the flowFlow wcFlow = flowConnector.connect( flowDef );wcFlow.writeDOT( "dot/wc.dot" );wcFlow.complete();

DocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 22: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

map

reduceEvery('wc')[Count[decl:'count']]

Hfs['TextDelimited[[UNKNOWN]->['token', 'count']]']['output/wc']']

GroupBy('wc')[by:['token']]

Each('token')[RegexSplitGenerator[decl:'token'][args:1]]

Hfs['TextDelimited[['doc_id', 'text']->[ALL]]']['data/rain.txt']']

[head]

[tail]

[{2}:'token', 'count'][{1}:'token']

[{2}:'doc_id', 'text'][{2}:'doc_id', 'text']

wc[{1}:'token'][{1}:'token']

[{2}:'token', 'count'][{2}:'token', 'count']

[{1}:'token'][{1}:'token']

WordCount – generated flow diagramDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 23: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

(ns impatient.core  (:use [cascalog.api]        [cascalog.more-taps :only (hfs-delimited)])  (:require [clojure.string :as s]            [cascalog.ops :as c])  (:gen-class))

(defmapcatop split [line]  "reads in a line of string and splits it by regex"  (s/split line #"[\[\]\\\(\),.)\s]+"))

(defn -main [in out & args]  (?<- (hfs-delimited out)       [?word ?count]       ((hfs-delimited in :skip-header? true) _ ?line)       (split ?line :> ?word)       (c/count ?count)))

; Paul Lam; github.com/Quantisan/Impatient

WordCount – Cascalog / ClojureDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 24: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

github.com/nathanmarz/cascalog/wiki

• implements Datalog in Clojure, with predicates backed by Cascading – for a highly declarative language

• run ad-hoc queries from the Clojure REPL –approx. 10:1 code reduction compared with SQL

• composable subqueries, used for test-driven development (TDD) practices at scale

• Leiningen build: simple, no surprises, in Clojure itself

• more new deployments than other Cascading DSLs – Climate Corp is largest use case: 90% Clojure/Cascalog

• has a learning curve, limited number of Clojure developers

• aggregators are the magic, and those take effort to learn

WordCount – Cascalog / ClojureDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 25: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

import com.twitter.scalding._ class WordCount(args : Args) extends Job(args) { Tsv(args("doc"), ('doc_id, 'text), skipHeader = true) .read .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") } .groupBy('token) { _.size('count) } .write(Tsv(args("wc"), writeHeader = true))}

WordCount – Scalding / ScalaDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 26: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

github.com/twitter/scalding/wiki

• extends the Scala collections API so that distributed lists become “pipes” backed by Cascading

• code is compact, easy to understand

• nearly 1:1 between elements of conceptual flow diagram and function calls

• extensive libraries are available for linear algebra, abstract algebra, machine learning – e.g., Matrix API, Algebird, etc.

• significant investments by Twitter, Etsy, eBay, etc.

• great for data services at scale

• less learning curve than Cascalog

WordCount – Scalding / ScalaDocumentCollection

WordCount

TokenizeGroupBytoken Count

R

M

Page 27: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

A Thought Exercise

Consider that when a company like Caterpillar moves into data science, they won’t be building the world’s next search engine or social network

They will be optimizing supply chain, optimizing fuel costs, automating data feedback loops integrated into their equipment…

Operations Research –crunching amazing amounts of data

$50B company, in a $250B market segment

Upcoming: tractors as drones – guided by complex, distributed data apps

Page 28: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Alternatively…

climate.com

Page 29: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Two Avenues to the App Layer…

scale ➞co

mpl

exity

Enterprise: must contend with complexity at scale everyday…

incumbents extend current practices and infrastructure investments – using J2EE, ANSI SQL, SAS, etc. – to migrate workflows onto Apache Hadoop while leveraging existing staff

Start-ups: crave complexity and scale to become viable…

new ventures move into Enterprise space to compete using relatively lean staff, while leveraging sophisticated engineering practices, e.g., Cascalog and Scalding

Page 31: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Lingual – ANSI SQL

• collab with Optiq – industry-proven code base

• ANSI SQL parser/optimizer atop Cascading flow planner

• JDBC driver to integrate into existing tools and app servers

• relational catalog over a collection of unstructured data

• SQL shell prompt to run queries

• enable analysts without retraining on Hadoop, etc.

• transparency for Support, Ops, Finance, et al.

a language for queries – not a database,but ANSI SQL as a DSL for workflows

Page 32: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Lingual – CSV data in local file system

cascading.org/lingual

Page 33: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Lingual – shell prompt, catalog

cascading.org/lingual

Page 34: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Lingual – queries

cascading.org/lingual

Page 35: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

# load the JDBC packagelibrary(RJDBC) # set up the driverdrv <- JDBC("cascading.lingual.jdbc.Driver", "~/src/concur/lingual/lingual-local/build/libs/lingual-local-1.0.0-wip-dev-jdbc.jar") # set up a database connection to a local repositoryconnection <- dbConnect(drv, "jdbc:lingual:local;catalog=~/src/concur/lingual/lingual-examples/tables;schema=EMPLOYEES") # query the repository: in this case the MySQL sample database (CSV files)df <- dbGetQuery(connection, "SELECT * FROM EMPLOYEES.EMPLOYEES WHERE FIRST_NAME = 'Gina'")head(df) # use R functions to summarize and visualize part of the datadf$hire_age <- as.integer(as.Date(df$HIRE_DATE) - as.Date(df$BIRTH_DATE)) / 365.25summary(df$hire_age)

library(ggplot2)m <- ggplot(df, aes(x=hire_age))m <- m + ggtitle("Age at hire, people named Gina")m + geom_histogram(binwidth=1, aes(y=..density.., fill=..count..)) + geom_density()

Lingual – connecting Hadoop and R

Page 36: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

> summary(df$hire_age) Min. 1st Qu. Median Mean 3rd Qu. Max. 20.86 27.89 31.70 31.61 35.01 43.92

Lingual – connecting Hadoop and R

cascading.org/lingual

Page 37: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Hadoop Cluster

sourcetap

sourcetap sink

taptraptap

customer profile DBsCustomer

Prefs

logslogs

Logs

DataWorkflow

Cache

Customers

Support

WebApp

Reporting

Analytics Cubes

sinktap

Modeling PMML

Pattern – model scoring

• migrate workloads: SAS,Teradata, etc., exporting predictive models as PMML

• great open source tools – R, Weka, KNIME, Matlab, RapidMiner, etc.

• integrate with other libraries –Matrix API, etc.

• leverage PMML as another kind of DSL

cascading.org/pattern

Page 38: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

• established XML standard for predictive model markup

• organized by Data Mining Group (DMG), since 1997 http://dmg.org/

• members: IBM, SAS, Visa, NASA, Equifax, Microstrategy, Microsoft, etc.

• PMML concepts for metadata, ensembles, etc., translate directly into Cascading tuple flows

“PMML is the leading standard for statistical and data mining models and supported by over 20 vendors and organizations. With PMML, it is easy to develop a model on one system using one application and deploy the model on another system using another application.”

PMML – standard

wikipedia.org/wiki/Predictive_Model_Markup_Language

Page 39: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

PMML – vendor coverage

Page 40: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

• Association Rules: AssociationModel element

• Cluster Models: ClusteringModel element

• Decision Trees: TreeModel element

• Naïve Bayes Classifiers: NaiveBayesModel element

• Neural Networks: NeuralNetwork element

• Regression: RegressionModel and GeneralRegressionModel elements

• Rulesets: RuleSetModel element

• Sequences: SequenceModel element

• Support Vector Machines: SupportVectorMachineModel element

• Text Models: TextModel element

• Time Series: TimeSeriesModel element

PMML – model coverage

ibm.com/developerworks/industry/library/ind-PMML2/

Page 41: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

## train a RandomForest model f <- as.formula("as.factor(label) ~ .")fit <- randomForest(f, data_train, ntree=50) ## test the model on the holdout test set print(fit$importance)print(fit) predicted <- predict(fit, data)data$predicted <- predictedconfuse <- table(pred = predicted, true = data[,1])print(confuse) ## export predicted labels to TSV write.table(data, file=paste(dat_folder, "sample.tsv", sep="/"), quote=FALSE, sep="\t", row.names=FALSE) ## export RF model to PMML saveXML(pmml(fit), file=paste(dat_folder, "sample.rf.xml", sep="/"))

Pattern – create a model in R

Page 42: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

public static void main( String[] args ) throws RuntimeException { String inputPath = args[ 0 ]; String classifyPath = args[ 1 ]; // set up the config properties Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, Main.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );  // create source and sink taps Tap inputTap = new Hfs( new TextDelimited( true, "\t" ), inputPath ); Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath );  // handle command line options OptionParser optParser = new OptionParser(); optParser.accepts( "pmml" ).withRequiredArg();  OptionSet options = optParser.parse( args );  // connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName( "classify" ) .addSource( "input", inputTap ) .addSink( "classify", classifyTap );  if( options.hasArgument( "pmml" ) ) { String pmmlPath = (String) options.valuesOf( "pmml" ).get( 0 ); PMMLPlanner pmmlPlanner = new PMMLPlanner() .setPMMLInput( new File( pmmlPath ) ) .retainOnlyActiveIncomingFields() .setDefaultPredictedField( new Fields( "predict", Double.class ) ); // default value if missing from the model flowDef.addAssemblyPlanner( pmmlPlanner ); }  // write a DOT file and run the flow Flow classifyFlow = flowConnector.connect( flowDef ); classifyFlow.writeDOT( "dot/classify.dot" ); classifyFlow.complete(); }

Pattern – score a model, within an app

Page 44: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Q3 1997: inflection point

four independent teams were working toward horizontal scale-out of workflows based on commodity hardware

this effort prepared the way for huge Internet successesin the 1997 holiday season… AMZN, EBAY, Inktomi (YHOO Search), then GOOG

MapReduce and the Apache Hadoop open source stack emerged from this period

Page 45: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

RDBMS

Stakeholder

SQL Queryresult sets

Excel pivot tablesPowerPoint slide decks

Web App

Customers

transactions

Product

strategy

Engineering

requirements

BIAnalysts

optimizedcode

Circa 1996: pre- inflection point

Page 46: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

RDBMS

Stakeholder

SQL Queryresult sets

Excel pivot tablesPowerPoint slide decks

Web App

Customers

transactions

Product

strategy

Engineering

requirements

BIAnalysts

optimizedcode

Circa 1996: pre- inflection point

“throw it over the wall”

Page 47: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

RDBMS

SQL Queryresult sets

recommenders+

classifiersWeb Apps

customertransactions

AlgorithmicModeling

Logs

eventhistory

aggregation

dashboards

Product

EngineeringUX

Stakeholder Customers

DW ETL

Middleware

servletsmodels

Circa 2001: post- big ecommerce successes

Page 48: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

RDBMS

SQL Queryresult sets

recommenders+

classifiersWeb Apps

customertransactions

AlgorithmicModeling

Logs

eventhistory

aggregation

dashboards

Product

EngineeringUX

Stakeholder Customers

DW ETL

Middleware

servletsmodels

Circa 2001: post- big ecommerce successes

“data products”

Page 49: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere

Page 50: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere

“optimize topologies”

Page 51: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Amazon“Early Amazon: Splitting the website” – Greg Lindenglinden.blogspot.com/2006/02/early-amazon-splitting-website.html

eBay“The eBay Architecture” – Randy Shoup, Dan Pritchettaddsimplicity.com/adding_simplicity_an_engi/2006/11/you_scaled_your.htmladdsimplicity.com.nyud.net:8080/downloads/eBaySDForum2006-11-29.pdf

Inktomi (YHOO Search)“Inktomi’s Wild Ride” – Erik Brewer (0:05:31 ff)youtu.be/E91oEn1bnXM

Google“Underneath the Covers at Google” – Jeff Dean (0:06:54 ff)youtu.be/qsan-GQaeykperspectives.mvdirona.com/2008/06/11/JeffDeanOnGoogleInfrastructure.aspx

MIT Media Lab“Social Information Filtering for Music Recommendation” – Pattie Maespubs.media.mit.edu/pubs/papers/32paper.psted.com/speakers/pattie_maes.html

Primary Sources

Page 52: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Cluster Computing’s Dirty Little Secret

many of us make a good living by leveraging high ROI apps based on clusters, and so execs agree to build out more data centers…

clusters for Hadoop/HBase, for Storm, for MySQL, for Memcached, for Cassandra, for Nginx, etc.

this becomes expensive!

a single class of workloads on a given cluster is simpler to manage, but terrible for utilization… various notions of “cloud” help…

Cloudera, Hortonworks, probably EMC soon: sell a notion of “Hadoop as OS” ⇒ All your workloads are belong to us

Google Data Center, Fox News

~2002

Page 53: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Three Laws, or more?

meanwhile, architectures evolve toward much, much larger data…

pistoncloud.com/ ...

Rich Freitas, IBM Research

Q:what disruptions in topologies+algorithms could this imply? given there’s no such thing as RAM anymore…

Page 54: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Three Laws, or more?

meanwhile, architectures evolve toward much, much larger data…

pistoncloud.com/ ...

Rich Freitas, IBM Research

regardless of how architectures change, death and taxes will endure:

servers fail, data must move

Q:what disruptions in topologies+algorithms could this imply? given there’s no such thing as RAM anymore…

Page 55: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

The Modern Kernel: Top Linux Contributors…

Page 56: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Beyond Hadoop

Hadoop – an open source solution for fault-tolerant parallel processing of batch jobs at scale, based on commodity hardware… however, other priorities have emerged for the analytics lifecycle:

• apps require integration beyond Hadoop

• multiple topologies, mixed workloads, multi-tenancy

• higher utilization

• lower latency

• highly-available, long running services

• more than “Just JVM” – e.g., Python growth

keep in mind the priority for multi-disciplinary efforts, to break down even more silos – well beyond the de facto “priesthood” of data engineering

Page 57: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Beyond Hadoop

Google has been doing data center computing for years, to address the complexities of large-scale data workflows:

• leveraging the modern kernel: isolation in lieu of VMs

• “most (>80%) jobs are batch jobs, but the majority of resources (55–80%) are allocated to service jobs”

• mixed workloads, multi-tenancy

• relatively high utilization rates

• JVM? not so much…

• reality: scheduling batch is simple; scheduling services is hard/expensive

Page 59: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

“Return of the Borg”

Omega: flexible, scalable schedulers for large compute clustersMalte Schwarzkopf, Andy Konwinski, Michael Abd-El-Malek, John Wilkeseurosys2013.tudos.org/wp-content/uploads/2013/paper/Schwarzkopf.pdf

Page 60: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos – definitions

a common substrate for cluster computing

heterogenous assets in your data center or cloud made available as a homogenous set of resources

• top-level Apache project

• scalability to 10,000s of nodes

• obviates the need for virtual machines

• isolation (pluggable) for CPU, RAM, I/O, FS, etc.

• fault-tolerant replicated master using ZooKeeper

• multi-resource scheduling (memory and CPU aware)

• APIs in C++, Java, Python

• web UI for inspecting cluster state

• available for Linux, OpenSolaris, Mac OSX

Page 61: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos – architecture

Ruby

Kernel

Apps

servicesbatch

Frameworks

Python

JVM

C++

Workloads

distributed file system

Chronos

DFS

distributed resources: CPU, RAM, I/O, FS, rack locality, etc. Cluster

Storm

Kafka JBoss Django RailsSharkImpalaScalding

Marathon

SparkHadoopMPI

MySQL

Page 62: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos – architecture

given use of Mesos as a Data Center OS kernel…

• Chronos provides complex scheduling capabilities,much like a distributed Unix “cron”

• Marathon provides highly-available long-running services, much like a distributed Unix “init.d”

• next time you need to build a distributed app, consider using these as building blocks

a major lesson learned from Spark:

• leveraging these kinds of building blocks, one can rebuild Hadoop 100x faster, in much less code

Page 63: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos – data center OS stack

HADOOP STORM CHRONOS RAILS JBOSS

TELEMETRY

Kernel

OS

Apps

MESOS

CAPACITY PLANNING GUISECURITYSMARTER SCHEDULING

Page 64: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Prior Practice: Dedicated Servers

DATACENTER

• low utilization rates

• longer time to ramp up new services

Page 65: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Prior Practice: Virtualization

DATACENTER PROVISIONED VMS

• even more machines to manage

• substantial performance decrease due to virtualization

• VM licensing costs

Page 66: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Prior Practice: Static Partitioning

DATACENTER STATIC PARTITIONING

• even more machines to manage

• substantial performance decrease due to virtualization

• VM licensing costs

• static partitioning limits elasticity

Page 67: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

MESOS

Mesos: One Large Pool Of Resources

DATACENTER

“We wanted people to be able to program for the data center just like they program for their laptop."

Ben Hindman

Page 69: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

What are the costs of Single Tenancy?

0%

25%

50%

75%

100%

RAILS CPU LOAD

MEMCACHED CPU LOAD

0%

25%

50%

75%

100%

HADOOP CPU LOAD

0%

25%

50%

75%

100%

t t

0%

25%

50%

75%

100%

Rails MemcachedHadoop

COMBINED CPU LOAD (RAILS, MEMCACHED, HADOOP)

Page 70: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

M

MasterDockerRegistry

index.docker.io

LocalDockerRegistry

( optional )

M

M

S

S

S

S

S

S

marathon

docker

docker

docker

Mesosmaster servers

Mesosslave servers

Marathon can launch and monitor service containers from one or more Docker registries, using the Docker executor for Mesos

S

S

S S

S

S

…… … …

mesosphere.io/2013/09/26/docker-on-mesos/

Example: Docker on Mesos

Page 71: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos Master Server

init | + mesos-master | + marathon |

Mesos Slave Server

init | + docker | | | + lxc | | | + (user task, under container init system) | | | + mesos-slave | | | + /var/lib/mesos/executors/docker | | | | | + docker run … | | |

The executor, monitored by the Mesos slave, delegates to the local Docker daemon for image discovery and management. The executor communicates with Marathon via the Mesos master and ensures that Docker enforces the specified resource limitations.

mesosphere.io/2013/09/26/docker-on-mesos/

Example: Docker on Mesos

Page 72: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Mesos Master Server

init | + mesos-master | + marathon |

Mesos Slave Server

init | + docker | | | + lxc | | | + (user task, under container init system) | | | + mesos-slave | | | + /var/lib/mesos/executors/docker | | | | | + docker run … | | |

DockerRegistry

When a user requests a container…

Mesos, LXC, and Docker are tied together for launch

21

3

4

5

6

7

8

Example: Docker on Mesos

mesosphere.io/2013/09/26/docker-on-mesos/

Page 73: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Arguments for Data Center Computing

rather than running several specialized clusters, each at relatively low utilization rates, instead run many mixed workloads

obvious benefits are realized in terms of:

• scalability, elasticity, fault tolerance, performance, utilization

• reduced equipment cap­ex, Ops overhead, etc.

• reduced licensing, eliminating need for VMs or potential vendor lock­in

subtle benefits – arguably, more important for Enterprise IT:

• reduced time for engineers to ramp­up new services at scale

• reduced latency between batch and services, enabling new high­ROI use cases

• enables Dev/Test apps to run safely on a Production cluster

Page 74: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Deployments

Page 75: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Opposite Ends of the Spectrum, One Substrate

Built-in /bare metal

Hypervisors

Solaris Zones

Linux CGroups

Page 76: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Opposite Ends of the Spectrum, One Substrate

Request /Response Batch

Page 77: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Case Study: Twitter (bare metal / on premise)

“Mesos is the cornerstone of our elastic compute infrastructure – it’s how we build all our new services and is critical for Twitter’s continued success at scale. It's one of the primary keys to our data center efficiency."

Chris Fry, SVP Engineeringblog.twitter.com/2013/mesos-graduates-from-apache-incubation

• key services run in production: analytics, typeahead, ads

• Twitter engineers rely on Mesos to build all new services

• instead of thinking about static machines, engineers think about resources like CPU, memory and disk

• allows services to scale and leverage a shared pool of servers across data centers efficiently

• reduces the time between prototyping and launching

Page 78: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Case Study: Airbnb (fungible cloud infrastructure)

“We think we might be pushing data science in the field of travel more so than anyone has ever done before… a smaller number of engineers can have higher impact through automation on Mesos."

Mike Curtis, VP Engineeringgigaom.com/2013/07/29/airbnb-is-engineering-itself-into-a-data-driven...

• improves resource management and efficiency

• helps advance engineering strategy of building small teams that can move fast

• key to letting engineers make the most of AWS-based infrastructure beyond just Hadoop

• allowed company to migrate off Elastic MapReduce

• enables use of Hadoop along with Chronos, Spark, Storm, etc.

Page 79: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Media Coverage

Play Framework Grid Deployment with MesosJames Ward, Flo Leibert, et al.Typesafe blog (2013-09-19)typesafe.com/blog/play-framework-grid...

Mesosphere Launches Marathon FrameworkAdrian BridgwaterDr. Dobbs (2013-09-18)drdobbs.com/open-source/mesosphere...

New open source tech Marathon wants to make your data center run like Google’sDerrick HarrisGigaOM (2013-09-04)gigaom.com/2013/09/04/...

Running batch and long-running, highly available service jobs on the same clusterBen LoricaO’Reilly (2013-09-01)strata.oreilly.com/2013/09/...

Page 82: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere – Four-Part Harmony

Page 83: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere – Four-Part Harmony

1. End Use Cases, the drivers

Page 84: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere – Four-Part Harmony

2. A new kind of team process

Page 85: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere – Four-Part Harmony

3. Abstraction layer as optimizing middleware, e.g., Cascading

Page 86: Boulder/Denver BigData: Cluster Computing with Apache Mesos and Cascading

Workflow

RDBMS

near timebatch

services

transactions,content

socialinteractions

Web Apps,Mobile, etc.History

Data Products Customers

RDBMS

LogEvents

In-Memory Data Grid

Hadoop, etc.

Cluster Scheduler

Prod

Eng

DW

Use Cases Across Topologies

s/wdev

datascience

discovery+

modeling

Planner

Ops

dashboardmetrics

businessprocess

optimizedcapacitytaps

DataScientist

App Dev

Ops

DomainExpert

introducedcapability

existingSDLC

Circa 2013: clusters everywhere – Four-Part Harmony

4. Data Center OS, e.g., Mesos