hadoop: the default machine learning platform ?

Post on 26-Jan-2015

110 Views

Category:

Data & Analytics

1 Downloads

Preview:

Click to see full reader

DESCRIPTION

Apache Hadoop, since its humble beginning as an execution engine for web crawler and building search indexes, has matured into a general purpose distributed application platform and data store. Large Scale Machine Learning (LSML) techniques and algorithms proved to be quite tricky for Hadoop to handle, ever since we started offering Hadoop as a service at Yahoo in 2006. In this talk, I will discuss early experiments of implementing LSML algorithms on Hadoop at Yahoo. I will describe how it changed Hadoop, and led to generalization of the Hadoop platform to accommodate programming paradigms other than MapReduce. I will unveil some of our recent efforts to incorporate diverse LSML runtimes into Hadoop, evolving it to become *THE* LSML platform. I will also make a case for an industry-standard LSML benchmark, based on common deep analytics pipelines that utilize LSML workload.

TRANSCRIPT

Hadoop: The Default Machine Learning

Platform ?

Milind BhandarkarChief Scientist, Pivotal

@techmilind

Wednesday, December 18, 2013

About Me

• http://www.linkedin.com/in/milindb

• Founding member of Hadoop team at Yahoo! [2005-2010]

• Contributor to Apache Hadoop since v0.1

• Built and led Grid Solutions Team at Yahoo! [2007-2010]

• Parallel Programming Paradigms [1989-today] (PhD cs.illinois.edu)

• Center for Development of Advanced Computing (C-DAC), National Center for Supercomputing Applications (NCSA), Center for Simulation of Advanced Rockets, Siebel Systems (acquired by Oracle), Pathscale Inc. (acquired by QLogic), Yahoo!, LinkedIn, and Pivotal (formerly Greenplum)

Wednesday, December 18, 2013

Acknowledgements

•Developers of various Open-Source, and Proprietary Data Platforms

• Ex-Colleagues at Yahoo! Research

• Colleagues at Data Science Team, Pivotal

• Vijay Narayanan, Microsoft

Wednesday, December 18, 2013

Wednesday, December 18, 2013

Kryptonite: First Hadoop Cluster At Yahoo!

Wednesday, December 18, 2013

M45: Academic Collaboration

Wednesday, December 18, 2013

Analytics Workbench

Wednesday, December 18, 2013

Analytics Workbench

Wednesday, December 18, 2013

!"#$%&'())'

BATCH

HDFS

!"#$%&'())'

INTERACTIVE

!"#$%&'())'

BATCH

HDFS

!"#$%&'())'

BATCH

HDFS

!"#$%&'())'

ONLINE

Hadoop 1.0(Image Courtesy Arun Murthy, Hortonworks)

Wednesday, December 18, 2013

MapReduce 1.0

(Image Courtesy Arun Murthy, Hortonworks)

Wednesday, December 18, 2013

ML in MapReduce: Why ?

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• High data throughput: 100 TB/hr using 500 mappers

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• High data throughput: 100 TB/hr using 500 mappers

• Framework provides fault tolerance

• Monitors and and re-starts tasks on other machines should one of the machines fail

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• High data throughput: 100 TB/hr using 500 mappers

• Framework provides fault tolerance

• Monitors and and re-starts tasks on other machines should one of the machines fail

• Excels in counting patterns over data records

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• High data throughput: 100 TB/hr using 500 mappers

• Framework provides fault tolerance

• Monitors and and re-starts tasks on other machines should one of the machines fail

• Excels in counting patterns over data records

• Built on relatively cheap, commodity hardware

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• High data throughput: 100 TB/hr using 500 mappers

• Framework provides fault tolerance

• Monitors and and re-starts tasks on other machines should one of the machines fail

• Excels in counting patterns over data records

• Built on relatively cheap, commodity hardware

• Large volumes of data already stored on Hadoop clusters running MapReduce

Wednesday, December 18, 2013

ML in MapReduce: Why ?

• Learning can become limited by computation time and not data volume

• With large enough data and number of machines

• Reduces the need to down-sample data

• More accurate parameter estimates compared to learning on a single machine for the same amount of time

Wednesday, December 18, 2013

Learning Models in MapReduce

• Data parallel algorithms are most appropriate for MapReduce implementations

• Not necessarily the most optimal implementation for a specific algorithm

• Other specialized non-MapReduce implementations exist for some algorithms, which may be better

• MR may not be the appropriate framework for exact solutions of non data parallel/sequential algorithms

• Approximate solutions using MR may be good enough

Wednesday, December 18, 2013

Types of Learning in MapReduce

• Parallel training of multiple models

• Train either in Mappers or Reducers

• Ensemble training methods

• Train multiple models and combine them

•Distributed learning algorithms

• Learn using both Mappers and Reducers

Wednesday, December 18, 2013

Parallel Training of Multiple Models

• Train multiple models simultaneously using a learning algorithm that can be learnt in memory

• Useful when individual models are trained using a subset, filtered or modification of raw data

• Train 1 model in each reducer

• Map:

• Input: All data

• Filters subset of data relevant for each model training

• Output: <model_index, subset of data for training this model>

• Reduce

• Train model on data corresponding to that model_index

Wednesday, December 18, 2013

Distributed Learning Algorithms

• Suitable for learning algorithms that are

• Compute-Intensive per data record

• One or few iterations for learning

• Do not transfer much data between iterations

• Typical algorithms

• Fit the Statistical query model (SQM)

• Divide and conquer

Wednesday, December 18, 2013

k-Means Clustering• Choose k samples as initial cluster centroids

• In each MapReduce Iteration:

• Assign membership of each point to closest cluster

• Re-compute new cluster centroids using assigned members

• Control program to

• Initialize the centroids

• random, initial clustering on sample etc.

• Run the MapReduce iterations

• Determine stopping criterion

Wednesday, December 18, 2013

k-Means Clustering in MapReduce

Wednesday, December 18, 2013

k-Means Clustering in MapReduce

• Map

• Input data points: x_i

• Input cluster centroids: c_i

• Assign each data point to closest cluster

• Output

Wednesday, December 18, 2013

k-Means Clustering in MapReduce

• Map

• Input data points: x_i

• Input cluster centroids: c_i

• Assign each data point to closest cluster

• Output

• Reduce

• Compute new centroids for each cluster

Wednesday, December 18, 2013

Complexity of k-Means Clustering

• Each point is compared with each cluster centroid

• Complexity = N*K*O(d) where O(d) is the complexity of the distance metric

• Typical Euclidean distance is not a cheap operation

• Can reduce complexity using an initial canopy clustering to partition data cheaply

Wednesday, December 18, 2013

Apache Mahout• Goal

• Create scalable, machine learning algorithms under the Apache license

• Contains both:

• Hadoop implementations of algorithms that scale linearly with data.

• Fast sequential (non MapReduce) algorithms

• Wiki:

• https://cwiki.apache.org/confluence/display/MAHOUT/Mahout+Wiki

Wednesday, December 18, 2013

Algorithms in Mahout• Classification:

• Logistic Regression, Naïve Bayes, Complementary Naïve Bayes, Random Forests

• Clustering

• K-means, Fuzzy k-means, Canopy, Mean-shift clustering, Dirichlet Process clustering, Latent Dirichlet allocation, Spectral clustering

• Parallel FP growth

• Item based recommendations

• Stochastic Gradient Descent (sequential)

Wednesday, December 18, 2013

Challenges for ML with MapReduce

•MapReduce is optimized for large batch data processing

• Assumes data parallelism

• Ideal for shared-nothing computing

•Many learning algorithms are iterative

• Incur significant overheads per iteration

Wednesday, December 18, 2013

Challenges (contd.)

•Multiple scans of the same data

• Typically once per iteration: High I/O overhead reading data into mappers per iteration

• In some algorithms static data is read into mappers in each iteration

• e.g. input data in k-means clustering.

Wednesday, December 18, 2013

Challenges (contd.)

•Need a separate controller outside the framework to:

• Coordinate the multiple MapReduce jobs for each iteration

• Perform some computations between iterations and at the end

•Measure and implement stopping criterion

Wednesday, December 18, 2013

Challenges (contd.)

• Incur multiple task initialization overheads

• Setup and tear down mapper and reducer tasks per iteration

• Transfer/shuffle static data between mapper and reducer repeatedly

• Intermediate data is transferred through index/data files on local disks of mappers and pulled by reducers

Wednesday, December 18, 2013

Challenges (contd.)

• Blocking architecture

• Reducers cannot start till all map jobs complete

• Availability of nodes in a shared environment

•Wait for mapper and reducer nodes to become available in each iteration in a shared computing cluster

Wednesday, December 18, 2013

Iterative Algorithms in MapReduce

Pass Result

Wednesday, December 18, 2013

Iterative Algorithms in MapReduce

Overhead per Iteration:• Job setup• Data Loading• Disk I/O

Pass Result

Wednesday, December 18, 2013

Enhancements to MapReduce

• Many proposals to overcome these challenges

• All try to retain the core strengths of data partitioning and fault tolerance of Hadoop to various degrees

• Proposed enhancements and alternatives to Hadoop

• Worker/Aggregator framework, HaLoop, MapReduce Online, iMapReduce, Spark, Twister, Hadoop ML, ...

Wednesday, December 18, 2013

Worker/Aggregator

12/17/13102

Final Result

Wednesday, December 18, 2013

Worker/Aggregator

12/17/13102

Advantages:• Schedule once per Job• Data stays in memory• P2P communication

Final Result

Wednesday, December 18, 2013

HaLoop

• Programming model and architecture for iterations

• New APIs to express iterations in the framework

• Loop-aware task scheduling

• Physically co-locate tasks that use the same data in different iterations

• Remember association between data and node

• Assign task to node that uses data cached in that node

Wednesday, December 18, 2013

HaLoop (contd.)• Caching for loop invariant data:

• Detect invariants in first iteration, cache on local disk to reduce I/O and shuffling cost in subsequent iterations

• Cache for Mapper inputs, Reducer Inputs, Reducer outputs

• Caching to support fixpoint evaluation:

• Avoids the need for a dedicated MR step on each iteration

Wednesday, December 18, 2013

Spark

• Open Source Cluster Computing model:

• Different from MapReduce, but retains some basic character

• Optimized for :

• Iterative computations

• Applies to many learning algorithms

• Interactive data mining

• Load data once into multiple mappers and run multiple queries

Wednesday, December 18, 2013

Spark (contd.)• Programming model using working sets 

• applications reuse intermediate results in multiple parallel operations

• preserves the fault tolerance of MapReduce

• Supports

• Parallel loops over distributed datasets

• Loads data into memory for (re)use in multiple iterations

• Access to shared variables accessible from multiple machines

• Implemented in Scala,

• www.spark-project.org

Wednesday, December 18, 2013

Hadoop 2.0

(Image Courtesy Arun Murthy, Hortonworks)

HADOOP 1.0

!"#$%!"#$%&$'&()*"#+,'-+#*.(/"'0#1*

&'()*+,-*%!2+%.(#"*"#./%"2#*3'&'0#3#&(*

*4*$'('*5"/2#..,&01*

!"#$.%!"#$%&$'&()*"#+,'-+#*.(/"'0#1*

/0)1%!2+%.(#"*"#./%"2#*3'&'0#3#&(1*

2*3%!#6#2%7/&*#&0,&#1*

HADOOP 2.0

456%!$'('*8/91*

!57*%!.:+1*

%89:*;<%!2'.2'$,&01*

*

456%!$'('*8/91*

!57*%!.:+1*

%89:*;<%!2'.2'$,&01*

%

&)%!-'(2;1*

)2%%$9;*'=>%?;'(:%!"#$%&''()$*+,'

*

$*;75-*<%-.*/0'

*

Wednesday, December 18, 2013

!""#$%&'()*+,-)+.&'/0#1+2.+3&4(("+

35678+!"#$%&$'&()*"#+,'-+#*.(/0'1#2*

9!,.+!3+%4(#0*"#4/%05#*6'&'1#7#&(2***

:!;<3+=>&",04-%0?+

2.;@,!<;2A@+=;0B?+

7;,@!>2.C+=7D(EFG+7HGI?+

C,!J3+=C$E&"K?+

2.L>@>M,9+=7"&EN?+

3J<+>J2+=M"0)>J2?+

M.O2.@+=3:&*0?+

M;3@,+=70&E%K?+=P0&/0I?+

YARN Platform

(Image Courtesy Arun Murthy, Hortonworks)

Wednesday, December 18, 2013

!"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)*

+"',&-'$)*./.*

+"',&-'$)*0/1*

!"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)*

!"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)* !"#$%&'&($)*

+"',&-'$)*./0*

+"',&-'$)*./2*

3%*.*

+"',&-'$)*0/0*

+"',&-'$)*0/.*

+"',&-'$)*0/2*

3%0*

+4-$',0*

5$6"7)8$%&'&($)*

98:$#74$)*

YARN Architecture

(Image Courtesy Arun Murthy, Hortonworks)

Wednesday, December 18, 2013

YARN

• Yet Another Resource Negotiator

• Resource Manager

•Node Managers

• Application Masters

• Specific to paradigm, e.g. MR Application master (aka JobTracker)

Wednesday, December 18, 2013

MPP SQL On Hadoop

Wednesday, December 18, 2013

SQL-on-Hadoop

• Pivotal HAWQ

• Cloudera Impala, Facebook Presto, Apache Drill, Cascading Lingual, Optiq, Hortonworks Stinger

• Hadapt, Jethrodata, IBM BigSQL, Microsoft PolyBase

•More to come...

Wednesday, December 18, 2013

NetworkInterconnect

...

......HAWQ & HDFS MasterSevers

Planning & dispatch

SegmentSevers

Query execution

...Storage

HDFS, HBase …

Wednesday, December 18, 2013

Namenode

Breplication

Rack1 Rack2

DatanodeDatanode Datanode

Read/Write

Segment

Segment host

Segment

Segment

Segment host

Segment

Segment host

Master host

Meta Ops

HAWQ Interconnect

Segment

Segment

Segment

Segment hostSegment

Datanode

Segment Segment Segment Segment

Wednesday, December 18, 2013

HAWQ vs Hive

Lower is Better

Wednesday, December 18, 2013

Provides data-parallel implementations of mathematical, statistical and machine-learning

methods for structured and unstructured data.

In-Database Analytics

Wednesday, December 18, 2013

MADlib Algorithms

Wednesday, December 18, 2013

MADLib Functions• Linear Regression

• Logistic Regression

• Multinomial Logistic Regression

• K-Means

• Association Rules

• Latent Dirichlet Allocation

• Naïve Bayes

• Elastic Net Regression

• Decision Trees / Random Forest

• Support Vector Machines

• Cox Proportional Hazards Regression

• Descriptive Statistics

• ARIMA

Wednesday, December 18, 2013

k-Means Usage

SELECT * FROM madlib.kmeanspp ( ‘customers’, -- name of the input table ‘features’, -- name of the feature array column 2 -- k : number of clusters );

centroids | objective_fn | frac_reassigned | …

------------------------------------------------------------------------+------------------+-----------------+ …{{68.01668579784,48.9667382972952},{28.1452167573446,84.5992507653263}} | 586729.010675982 | 0.001 | …

Wednesday, December 18, 2013

Accessing HAWQ Through R

Wednesday, December 18, 2013

Pivotal R

• Interface is R client

• Execution is in database

• Parallelism handled by PivotalR

• Supports a portion of R

R> x = db.data.frame(“t1”)R> l = madlib.lm(interlocks ~ assets + nation, data = t)

Wednesday, December 18, 2013

Wednesday, December 18, 2013

A wrapper of MADlib

• Linear regression

• Logistic regression

• Elastic Net

• ARIMA

• Table summary

Wednesday, December 18, 2013

A wrapper of MADlib

• Linear regression

• Logistic regression

• Elastic Net

• ARIMA

• Table summary

• $ [ [[ $<- [<- [[<-

• is.na

• + - * / %% %/% ^

• & | !

• == != > < >= <=

• merge

• by

• db.data.frame

• as.db.data.frame

• preview• sort

• c mean sum sd var min max length colMeans colSums

• db.connect db.disconnect db.list db.objects

db.existsObject delete• dim names

• content

And more ... (SQL wrapper)

• predict

Wednesday, December 18, 2013

A wrapper of MADlib

• Linear regression

• Logistic regression

• Elastic Net

• ARIMA

• Table summary

• Categorial variable

as.factor()

• $ [ [[ $<- [<- [[<-

• is.na

• + - * / %% %/% ^

• & | !

• == != > < >= <=

• merge

• by

• db.data.frame

• as.db.data.frame

• preview• sort

• c mean sum sd var min max length colMeans colSums

• db.connect db.disconnect db.list db.objects

db.existsObject delete• dim names

• content

And more ... (SQL wrapper)

• predict

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Wrapper of objects in databasex = db.data.frame("table")

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Most operations

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.view

Most operations

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.viewas.db.data.frame(...)

Most operations

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.viewas.db.data.frame(...)

Most operations

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

MADlib wrapper functions

Wednesday, December 18, 2013

Pivotal Confidential–Internal Use Only 49

db.obj

db.data.frame db.Rquery

db.table db.viewas.db.data.frame(...)

Most operations

Wrapper of objects in databasex = db.data.frame("table")

Resides in R onlyx[,1:2], merge(x, y, by="column"), etc.

MADlib wrapper functions

preview

Wednesday, December 18, 2013

In-Database Execution

• All data stays in DB: R objects merely point to DB objects

• All model estimation and heavy lifting done in DB by MADlib

• R→ SQL translation done in the R client

•Only strings of SQL and model output transferred across ODBC/DBI

Wednesday, December 18, 2013

Beyond MapReduce

• Apache Giraph - BSP & Graph Processing

• Storm on Yarn - Streaming Computation

• HOYA - HBase on Yarn

• Hamster - MPI on Hadoop

•More to come ...

Wednesday, December 18, 2013

Hamster

• Hadoop and MPI on the same cluster

• OpenMPI Runtime on Hadoop YARN

• Hadoop Provides: Resource Scheduling, Process monitoring, Distributed File System

• Open MPI Provides: Process launching, Communication, I/O forwarding

Wednesday, December 18, 2013

Hamster Components

• Hamster Application Master

• Gang Scheduler, YARN Application Preemption

• Resource Isolation (lxc Containers)

•ORTE: Hamster Runtime

• Process launching, Wireup, Interconnect

Wednesday, December 18, 2013

Resource Manager

Scheduler

AMService

Node Manager Node Manager Node Manager !

Proc/Container

Framework Daemon NS MPI

Scheduler HNP

MPI AM

Proc/Container

! RM-AM

AM-NM

RM-NodeManager Client Client-RM

Aux Srvcs

Proc/Container

Framework Daemon NS

Proc/Container

!

Aux Srvcs RM-

NodeManager

Hamster ArchitectureWednesday, December 18, 2013

Hamster Scalability

• Sufficient for small to medium HPC workloads

• Job launch time gated by YARN resource scheduler

Launch WireUp Collectives Monitor

OpenMPI O(logN) O(logN) O(logN) O(logN)

Hamster O(N) O(logN) O(logN) O(logN)

Wednesday, December 18, 2013

GraphLab + Hamsteron Hadoop

!

Wednesday, December 18, 2013

About GraphLab

• Graph-based, High-Performance distributed computation framework

• Started by Prof. Carlos Guestrin in CMU in 2009

• Recently founded Graphlab Inc to commercialize Graphlab.org

Wednesday, December 18, 2013

GraphLab Features

• Topic Modeling (e.g. LDA)

• Graph Analytics (Pagerank, Triangle counting)

• Clustering (K-Means)

• Collaborative Filtering

• Linear Solvers

• etc...

Wednesday, December 18, 2013

Only Graphs are not Enough

• Full Data processing workflow required ETL/Postprocessing, Visualization, Data Wrangling, Serving

• MapReduce excels at data wrangling

• OLTP/NoSQL Row-Based stores excel at Serving

• GraphLab should co-exist with other Hadoop frameworks

Wednesday, December 18, 2013

Questions?Wednesday, December 18, 2013

top related