719 s19-l09 progmodels2-ml

21
2/18/19 1 Programming Models and Frameworks: Iterative Computation Advanced Cloud Computing 15-719/18-709 George Amvrosiadis Greg Ganger Majd Sakr Feb 18, 2019 15719/18-709 Adv. Cloud Computing 1 Advanced Cloud Computing Programming Models Ref 1: Yucheng Low, Joseph Gonzalez, Aapo Kyrola, Danny Bickson, Carlos Guestrin, and Joseph M. Hellerstein (2010). "GraphLab: A New Parallel Framework for Machine Learning." Conf on Uncertainty in Artificial Intelligence (UAI). http://www.select.cs.cmu.edu/publications/scripts/papers.cgi Ref 2: Spark: cluster computing with working sets. Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica. USENIX Hot Topics in Cloud Computing (HotCloud’10). http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf Feb 18, 2019 15719/18-709 Adv. Cloud Computing 2

Upload: others

Post on 13-Feb-2022

3 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: 719 S19-L09 ProgModels2-ML

2/18/19

1

Programming Modelsand Frameworks:

Iterative Computation

Advanced Cloud Computing

15-719/18-709

George AmvrosiadisGreg GangerMajd Sakr

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 1

Advanced Cloud Computing Programming Models

• Ref 1: Yucheng Low, Joseph Gonzalez, Aapo Kyrola, Danny Bickson, Carlos Guestrin, and Joseph M. Hellerstein (2010). "GraphLab: A New Parallel Framework for Machine Learning." Conf on Uncertainty in Artificial Intelligence (UAI). http://www.select.cs.cmu.edu/publications/scripts/papers.cgi

• Ref 2: Spark: cluster computing with working sets. Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica.

USENIX Hot Topics in Cloud Computing (HotCloud’10).http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 2

Page 2: 719 S19-L09 ProgModels2-ML

2/18/19

2

Advanced Cloud Computing Programming Models

• Optional• Ref 3: DyradLinQ: A system for general-purpose distributed data-parallel

computing using a high-level language. Yuan Yu, Michael Isard, Dennis Fetterly, Mihai Budiu, Ulfar Erlingsson, Pradeep Kumar Gunda, Jon Currey. OSDI’08. http://research.microsoft.com/en-us/projects/dryadlinq/dryadlinq.pdf

• Ref 4: TensorFlow: A system for large-scale machine learning. Martin Abadi, Paul Barham, Jianmin Chen, Zhifeng Chen, Andy Davis, Jeff Dean, MatthieuDevin, Sanjay Ghemawatt, Geoffrey Irving, Michael Isard. OSDI’16. https://www.usenix.org/system/files/conference/osdi16/osdi16-abadi.pdf

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 3

“Killer App” for Big Data:

Machine Learning (ML)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 4

Page 3: 719 S19-L09 ProgModels2-ML

2/18/19

3

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 5

2005 NIST Arabic-English Competition

Translate 100 articles• 2005 : Google wins!

Qualitatively better 1st entry

Not most sophisticated approach No one knew ArabicBrute force statistics

But more data & compute !!200M words from UN translations1 billion words of Arabic docs 1000 processor cluster

è Can’t compete w/o big data

BLEU Score

0.6

0.5

0.4

0.3

0.2

0.1

0.0

GoogleISIIBM+CMUUMDJHU+CUEdinburgh

Systran

Mitre

FSC

0.7

Topicidentification

Human-edittabletranslation

Usabletranslation

Expert humantranslator

Useless

Kosinski M et al. PNAS 2013;110:5802-5805©2013 by National Academy of Sciences

“The study is based on a sample of 58,466 volunteers from the United States, obtainedthrough the myPersonality Facebook application (www.mypersonality.org/wiki), whichincluded their Facebook profile information, a list of their Likes (n = 170 Likes per person ...”

“… easily accessible digital records of behavior, Facebook Likes, can be used to automaticallyand accurately predict a range of highly sensitive personal attributes … model correctlydiscriminates between homosexual and heterosexual men in 88% of cases, African Americans andCaucasianAmericans in 95% of cases, and between Democrat and Republican in 85% of cases.”

6

Lots of examples of ML effectiveness “out there”

Feb 18, 2019 15719/18-709 Adv. Cloud Computing

Page 4: 719 S19-L09 ProgModels2-ML

2/18/19

4

Stages of Machine Learning (high level desc.)

• Data collectiono Logistics, cleaning, ….

• Model selectiono Domain knowledge

• Data engineeringo Extract, transform, ….

• Model trainingo Fit parameters to data

• Model inferencingo Predict/label outcome from model

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 7

645 million users500 million tweets / day

100+ hours video uploaded every minute

Collaborative filtering

for Video recommendation:1 Billion

modelparameters

Google Brain Deep Learning

for images:1 Billion

model parameters

Stages of Machine Learning (process)

• Data collectiono Done mostly away from machine learning data center, then aggregated

• Model selectiono Done offline from collection/engineering/training/inference

• Data engineering (project 2 part 1)o Multiple data passes (Map/Reduces), large data reduction, more cleaning

• Model training (project 2 part 3)o Start with a guess of parameters, test against recorded input and output data,

adjust parameters, iterate many times (many data passes)• Model inferencing

o For one input, apply model and return one predicted output (no data passes)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 8

Page 5: 719 S19-L09 ProgModels2-ML

2/18/19

5

Netflix recommendations as an example

• Data collectiono Movie ratings provided by users

• Model selectiono View it as a giant matrix of users vs. movies, with rating in each entryo Known movie ratings are a sparse matrix… factor it and then generate missing elements

• Data engineeringo Convert rating log to easy-to-process form, filter “bad” ratings, … ?

• Model trainingo Use gradient descent to generate “close fit” factor matrices

• Model inferencingo Use factor matrices to predict entries (i.e., given user’s unknown rating of a given movie)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 9

10

ML model training

• Givens: a bunch of input data samples and selected ML model

• Goal: determine parameter values for model that fit the samples• Model training is solving for “best” parameter values

o Generally, no perfect solution existso So, add a figure of merit (objective function) to score a solution and

search solution space for best score• ML model training usually performed by iterative search

o E.g., gradient descento Search until “stop condition” reached (might be search time,

incremental score benefit of recent search steps, or a specific score)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing

Page 6: 719 S19-L09 ProgModels2-ML

2/18/19

6

ML model training via MapReduce (MR)

0) Store engineered input data and initial model parameters in files

1) Split input data among map tasks; replicate parameters (this is

known as “data parallel” decomposition) by having all read entire

parameter values file

2) Each map task tests model against data inputs & outputs and

computes changes in model parameters; send changes to reducers

3) Reducers combine changes from different map splits of data and

write a new model parameters file (and decide if training is over)

4) If training is not over, go to (1)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 11

Many inefficiencies w/ ML model training via MR

• If Hadoop, each map task and each reduce task are Java VM launch

• Iteration is in external scripts repeating Hadoop invocations

• Amount of compute per data item is not much

• No need to issue parameter update per data item; could

pre-combine updates for same parameter in memory of each mapo So shuffle is not a flow, but a single set of parameter updates per map

• Reducer function is simple: add updates for each parametero Most work is communication through file system

• è Overall: it may scale great, but overhead is higho ML training just not well suited for stateless, deterministic functions

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 12

Page 7: 719 S19-L09 ProgModels2-ML

2/18/19

7

Spark better (not “good”) for ML model training

• Don’t write reducer output to file system; cache in memory

• Don’t re-read engineered data from file system; cache in memory

• For small numbers of parameters, driver can collect & broadcast

• Combine map transformations to try for one shuffle per iteration

• Don’t launch separate Java VMs for each map task; retain one VM

for all tasks across all iterations

• Potential speedup is large – 10X in Spark papero è but still much less efficient than could be

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 13

Alternative: Parameter Servers

• ML model training via MR communicates parameter updates

through MR shuffle to reducers, then combines all parameters into

an RDD or file (possibly collected/broadcast by driver)

• Parameter Servers use a shared memory model for parameterso All “process input data subset” tasks can cache any/all parameters;

changes are pushed to them by parameter servers

o Reducers are replaced with atomic “add to parameter in shared mem”• RPCs to parameter server, e.g. INC and GET

o è Less data transmitted and less task overhead

o Also, input data easily avoids repartitioning in the next iteration

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 14

Page 8: 719 S19-L09 ProgModels2-ML

2/18/19

8

Iterative programfits model

ML model training (high level picture)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing15

Huge input data Model parameters(solution)

One

iter

atio

n

15719/18-709 Adv. Cloud Computing16

Model parameters(solution)

Parallel iterativeworkers

READ, INC, CLOCK

Parameter server

Input data(training data)

Data parallel with Parameter Server

Feb 18, 2019

Page 9: 719 S19-L09 ProgModels2-ML

2/18/19

9

Carnegie MellonParallel Data Laboratoryhttp://www.pdl.cmu.edu/ Aaron Harlap © April 17

Parameter Servers are Great for Iterative ML

• Parameter Servers shard solution state

(current param values) across machines

1

7

• Traditional architecture has

servers and workers on all

machines

• Used by IterStore, MXNet,

TensorFlow, Petuum, …

How much synch is needed during training?

• Basic MR is functional; inputs are read-only, outputs write-onlyo All communication occurs through RDDs/file systems after one complete

MR when a later MR reads the output file (RDD) of a prior MR

o This separation of write-only output becoming read-only input is a barrier

synchronization

• Parameter servers can be used synch

or allowed to run asyncho Async works because ML is iterative

approximation, converging to optimum

provided async error is bounded

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 18

Page 10: 719 S19-L09 ProgModels2-ML

2/18/19

10

GraphLab: early tools for parameter servers

• GraphLab started not from Hadoop MR but from shared memory

transaction processing – lots of parallel updates ordered by locks

• GraphLab provides a higher level programming modelo Data is associated with vertices and edges between vertices, inherently

sparse (or we’d use a matrix representation instead)• Non zeroes in a matrix representation are edges or vertices• Lots of machine learning data sets, like social media, are very sparse

o Update: code updates a vertex and its neighbor vertices in isolation

o Iteration: one complete pass over the input data, calculating updates

(“Fold” in GraphLab paper), then combine changes (“Apply” in GraphLab)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 19

Next day plan

• Wed: First of two class mtgs on resource allocation/scheduling

• Second one comes Mon

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 20

Page 11: 719 S19-L09 ProgModels2-ML

2/18/19

11

15719/18-709 Adv. Cloud Computing21

Eg. Medical Research

• Collect human genome and disease outcome for lots of people

• Model disease probability as a linear model of presence of gene pairs

• Millions to 1011 (pair-wise genes) parameters; thousands of patients• Model training is solving for “best” parameter weights

o Under-determined set of equations for learning model of gene influence on disease; infinite number of parameter sets match observed outputs

o Add figure of merit (objective function) to value a solution and search solution space for best merit

AT…….CG G AAAAT…….CG T AAA

AT…….CG T AAAAT…….CG T AAA

Feb 18, 2019

Samples(patients)

Model Training

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 22

for (t = 1 to T) {doThings()

doOtherThings()}

~✓t+1 = ~✓t +�f~✓(D)

argmax~✓

⌘ L({xi,yi}Ni=1 ; ~✓) + ⌦(~✓)

Model ParameterData

This computation needs to be parallelized!

~✓t+1 = g(~✓t, �f~✓(D))

Solved by an iterative convergent algorithm on vectors & matrices

Page 12: 719 S19-L09 ProgModels2-ML

2/18/19

12

Consistencyin GraphLab

• Many machine learning algorithms are tolerant of some data raceso “converging” to good enough may depend on data and schedule

• Graphlab allows some updates to “overlap” (not fully lock)o Much more parallel than matrix multiply because of the sparseness

o Totally safe if the transactions don’t update what is being overlapped• Ie., database serializable concurrent transactions guaranteed for edge or

vertex consistency given restrictions on what the update code can do

• Natural for shared memory multithreaded update o Like HPC (distributed) simulation

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 23

Schedulingin GraphLab

• Graphlab allows some

updates to do schedulingo Baseline is sequential execution of each vertex’ update once per iteration

o Sparseness allows non-overlapping updates to execute in parallel

o Opportunity for smart schedulers to exploit more app properties• Prioritize specific updates over other updates because these communicate

more information more quickly• Possible to execute some updates more often than others

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 24

Page 13: 719 S19-L09 ProgModels2-ML

2/18/19

13

Map/Reduce as Elastic Big Data Processing

• Big data has lotsof input: divideinto many splitsto be ‘map’ed

• Queue map taskson virtual cores

• Partition map task output to load balance work in reduce tasks• Effective elastic exploitation of more data on map task side

o Critical to scalability: partition function & reduce function• Unfortunate partition -> imbalanced load, degrade to little parallelism• Unfortunate reduce -> may need pre-sort (out of core), highly sensitive to real memory

availability (too little -> more out of core; too much -> thrashes)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 25

Through files (disk)

Spark as Map/Reduce 2.0

• Abstract as a sequential program in one machine (driver) o Driver sends work to a separate cluster (workers) to do map/reduce

• Combine map functions (spark calls these transformations)o E.g. rdd_x.map(foo).map(bar) is 2 passes of MR with null reduceso Spark creates function foo_bar() that combines foo() & bar() in map tasko Spark transforms combine this way until a shuffle is unavoidable (stage)

• Is big data big? (100X prior examples is big, but might only be GBs)o Cache reduce outputs in memory (or discard & recompute as needed)

• ‘cat <in | wc >out2’ versus ‘cat <in >out1; wc <out1 >out2’• for thin map() and reduce() functions, capturing out1 can be costly

• Automate splitting/partitioning (unless overridden)

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 26

Page 14: 719 S19-L09 ProgModels2-ML

2/18/19

14

DryadLinq

• Simplify efficient data parallel codeo Compiler support for imperative and

declarative (eg., database) operationso Extends MapReduce to workflows

that can be collectively optimized

• Data flows on edges between processes at vertices (workflows)• Coding is processes at vertices and expressions representing workflow• Interesting part of the compiler operates on the expressions

o Inspired by traditional database query optimizations – rewrite the execution plan with equivalent plan that is expected to execute faster

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 27

DryadLinq• Data flowing through a graph abstraction

o Vertices are programs (possibly different with each vertex)o Edges are data channels (pipe-like)o Requires programs to have no side-effects (no changes to shared state)o Apply function similar to MapReduce reduce – open ended user code

• Compiler operates on expressions, rewriting execution sequenceso Exploits prior work on compiler for workflows on sets (LINQ)o Extends traditional database query planning with less type restrictive code

• Unlike traditional plans, virtualizes resources (so might spill to storage)o Knows how to partition sets (hash, range and round robin) over nodeso Doesn’t always know what processes do, so less powerful optimizer than

database – where it can’t infer what is happening, it takes hints from userso Can auto-pipeline, remove redundant partitioning, reorder partitionings, etc

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 28

Page 15: 719 S19-L09 ProgModels2-ML

2/18/19

15

Example: MapReduce (reduce-reorderable)

• DryadLinqcompiler canpre-reduce,partition,sort-merge,partially aggregate

• In MapReduceyou “configure”this youself

Feb 18, 2019 15719/18-709 Adv. Cloud Computing 29

Page 16: 719 S19-L09 ProgModels2-ML

Project 2.3Iterative machine learning training

using Apache Spark

15-719/18-709 Advanced Cloud Computing

Learning Goals

● Develop a scalable distributed iterative machine learning program using Apache Spark.

● Explore mechanisms of communication in spark and learn how to optimize join performance.

2

Page 17: 719 S19-L09 ProgModels2-ML

A machine learning task: ads click prediction

● Input: logs of user activity, each row contains user information○ Label: “Did user click on the ad?”○ Features: User characteristics, e.g., age, profession of user, etc.

● Output: Model that determines whether a user U will click on an ad A (based on attributes of the user and the ad)

Label feature1 feature2 feature3 … featureN

Label feature1 feature2 feature3 … featureN

Label feature1 feature2 feature3 … featureN

number of feature dimensions (N)

sample1

sample2

sample3

3

ML Technique: Logistic regression

● ML Technique used: Logistic regression○ Fitting a sigmoid function to data○ Function separates data based on value

of L (label)● Your job: train the parameters of the model

○ Parameters: one weight per feature dimension

4

weight1 weight2 weight3 … weightNweight4

Page 18: 719 S19-L09 ProgModels2-ML

Training using Gradient Descent

5

● Gradient descent: iterative optimization algorithm○ starts with an random guess of the weight○ iteratively updated by the gradient of the model

● Takeaway: You have to run multiple iterations for your model to converge (i.e., be accurate) Black dots =

Weight value

Project Goal● You will be given a working Spark program for training logistic

regression that uses a broadcast variable to communicate model parameter values to executors.

● You need to rewrite the program to NOT use broadcast variables so it can scale to models with a large number of model parameters.

● Your program will be tested against 3 highly sparse datasets with increasing number of feature dimensions and number of samples.

● Like part 1: you will achieve partial grades for correct output.● Like part 2: your program needs to finish within certain target

runtime.

6

Page 19: 719 S19-L09 ProgModels2-ML

The starter program

● The data samples are stored as an RDD and partitioned among executors.

● Model parameters (i.e. weights) are stored in the driver.● When an iteration begins, the driver broadcasts the weights

to all executors. ● Then the executors compute updates.● Driver collects updates from executors and updates model

parameters locally. ● Repeat until convergence

7

The weights calculation and iteration

8

Data partition 1

Data partition 2

Data partition 3

Data partition 4

Data RDDWorkers

Weights

Send all weights to each executor

Collect all updates from executors

Page 20: 719 S19-L09 ProgModels2-ML

Which communication mechanism to use?

9

1 A, B

1 C, D

RDD A

1 E, F

RDD B

1 A, B

1 C, D

E, F

E, F

RDD C

Potential communication mechanisms:● Broadcast variable (cannot use this for P2.3!) ● RDD shuffle● Join RDD A and B

Key Value Key Value Key Value

Where to Start

● Ask Amazon to increase the SPOT instance

limit of c4.xlarge to 20 now!● Go through the project write-up on TPZ● Download the starter package

○ It’s different from the P2.1 and P2.2 versions!● Use the spark-ec2-setup tool to launch a cluster

10

Page 21: 719 S19-L09 ProgModels2-ML

Submission and Grading

● For each test configuration1. Launch a Spark cluster2. Download the data to HDFS3. Run the submitter with proper arguments on the

master node● Test configurations

○ Test case A - dataset: KDD2010 (target: 40mins) ○ Test case B - dataset: KDD2012 (target: 90mins) ○ Test case C - dataset: Criteo (target: 70mins)

● Budget: $5011

Q & A on Piazza

12