alpine tech talk: system ml by berthold reinwald

25
© 2015 IBM Corporation SystemML Scalable Machine Learning SystemML Team, IBM Almaden Research Presented by: Berthold Reinwald, Technical Lead [email protected] September, 2015

Upload: chester-chen

Post on 19-Jan-2017

970 views

Category:

Software


0 download

TRANSCRIPT

Page 1: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Scalable Machine Learning SystemML Team, IBM Almaden – Research

Presented by: Berthold Reinwald, Technical Lead

[email protected] September, 2015

Page 2: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML in IBM BigInsights Data Scientist

• Fruition of 5+ years research project

• Overview

SystemML engine

Broad class of scalable algorithms

Stand alone, MapReduce, Spark

Open Source

• Some Stats

Proof of Concepts with customers

8 technical research publications

2

BigInsights Data Scientist module: Accelerate data science teams with advanced

analytics to extract valuable insights from Hadoop

• Big R: Statistical analysis & distributed frames using entire Hadoop cluster

• Machine Learning: Scalable algorithms

• Text Analytics: Text extraction via business web tooling

Page 3: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

More Information

• Sebastian Schelter, Juan Soto, Volker Markl, Douglas Burdick, Berthold Reinwald, Alexandre V. Evfimievski: Efficient sample generation for scalable meta learning. ICDE 2015: 1191-1202

• Arash Ashari, Shirish Tatikonda, Matthias Boehm, Berthold Reinwald, Keith Campbell, John Keenleyside, P. Sadayappan: On optimizing machine learning workloads via kernel fusion. PPOPP 2015: 173-182

• Botong Huang, Matthias Boehm, Yuanyuan Tian, Berthold Reinwald, Shirish Tatikonda, Frederick R. Reiss: Resource Elasticity for Large-Scale Machine Learning. SIGMOD Conference 2015: 137-152

• Matthias Boehm, Douglas R. Burdick, Alexandre V. Evfimievski, Berthold Reinwald, Frederick R. Reiss, Prithviraj Sen, Shirish Tatikonda, Yuanyuan Tian: SystemML's Optimizer: Plan Generation for Large-Scale Machine Learning Programs. IEEE Data Eng. Bull. 37(3): 52-62 (2014)

• Matthias Boehm, Shirish Tatikonda, Berthold Reinwald, Prithviraj Sen, Yuanyuan Tian, Douglas Burdick, Shivakumar Vaithyanathan: Hybrid Parallelization Strategies for Large-Scale Machine Learning in SystemML. PVLDB 7(7): 553-564 (2014)

• Peter D. Kirchner, Matthias Boehm, Berthold Reinwald, Daby M. Sow, Michael Schmidt, Deepak S. Turaga, Alain Biem: Large Scale Discriminative Metric Learning. IPDPS Workshops 2014: 1656-1663

• Yuanyuan Tian, Shirish Tatikonda, Berthold Reinwald: Scalable and Numerically Stable Descriptive Statistics in SystemML. ICDE 2012: 1351-1359

• Amol Ghoting, Rajasekar Krishnamurthy, Edwin P. D. Pednault, Berthold Reinwald, Vikas Sindhwani, Shirish Tatikonda, Yuanyuan Tian, Shivakumar Vaithyanathan: SystemML: Declarative machine learning on MapReduce. ICDE 2011: 231-242

3

Algorithm

Optimizer

Resource

Elasticity

GPU

Sampling

Numeric

Stability

Task

Parallelism

1st paper

Page 4: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Open Source announced in June 2015

4

Page 5: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Big Data Analytics Usecases • Insurance

Problem Description

• optimal subset of features that leads to the best

regression model

Problem Size

• 1.1M observations, 95 features, Subsets of 15 variables

Algorithm

• Parallelization of independent model building

• Automotive

Problem Description

• Customer Satisfaction

Problem Size

• 2 mill cars with 8,000 reacquired cars, 10 mill repair

cases, 25 mill parts exchanges

Algorithms

• Logistic regression using ~22k feature variables

• Increasing the #features from ~250 to ~21,800,

improved precision/recall by order of magnitude

• Sequence mining using very low support value

• Very large number of intermediate result sequences.

• Air Transportation

Problem Description

• Predict passenger volumes at locations in an airport

Problem Size

• WiFi data with ~66 M rows for ~1.3 M MAC addr.

Algorithms

• Multiple models per location, per passenger type

• Time-series analysis using seasonal and non-seasonal

auto-regressive, moving average components along with

differencing operations (Arima and Holt-Winters triple

exponential smoothing)

• Financial Services

Problem Description

• Compute correlations between Financial Analysts’

performance metrics and sentiments extracted from

surveys submitted by them

Algorithms

• Descriptive (Bivariate) Statistics: Chi-squared test,

Spearman’s Rho, Gamma, Kendall’s Tau-B, Odds-Ratio

test, F-test (stratified and unstratified)

• Retail Banking

Problem Description

• Use statistical analysis on social media data linked to the

bank’s data to identify customer segments of interest, find

predictors of purchase intent, and gauge sentiment

towards bank’s products.

Algorithms

• Bivariate odds ratios and binomial proportions with

confidence intervals

• Services Company

Problem

• Compute a benchmark index by mapping producers’

financial reports into a normalized schema, using analytics

to extrapolate missing reports and/or impute missing

values.

Algorithms

• Regularized least-squares loss minimization and Gibbs

sampling (MCMC) jointly over the parameter space and

over the missing (estimated) values

5

More

• PCA on 7k attributes at rail road company

• GLM on 10B rows at insurance company

Page 6: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML

• Algorithms expressed in a declarative, high-level language with R-like syntax

• Cost-based compilation of algorithms to generate execution plans

Compilation and parallelization

• Based on data characteristics

• Based on cluster and machine characteristics

In-Memory single node and cluster execution

• Enable algorithm developer productivity to build additional algorithms (scalability, numeric stability and optimizations)

6

Linear Regression

Page 7: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Example: Gaussian Non-negative Matrix Factorization

7

package gnmf;

import java.io.IOException;

import java.net.URISyntaxException;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapred.JobConf;

public class MatrixGNMF

{

public static void main(String[] args) throws IOException, URISyntaxException

{

if(args.length < 10)

{

System.out.println("missing parameters");

System.out.println("expected parameters: [directory of v] [directory of w] [directory of h] " +

"[k] [num mappers] [num reducers] [replication] [working directory] " +

"[final directory of w] [final directory of h]");

System.exit(1);

}

String vDir = args[0];

String wDir = args[1];

String hDir = args[2];

int k = Integer.parseInt(args[3]);

int numMappers = Integer.parseInt(args[4]);

int numReducers = Integer.parseInt(args[5]);

int replication = Integer.parseInt(args[6]);

String outputDir = args[7];

String wFinalDir = args[8];

String hFinalDir = args[9];

JobConf mainJob = new JobConf(MatrixGNMF.class);

String vDirectory;

String wDirectory;

String hDirectory;

FileSystem.get(mainJob).delete(new Path(outputDir));

vDirectory = vDir;

hDirectory = hDir;

wDirectory = wDir;

String workingDirectory;

String resultDirectoryX;

String resultDirectoryY;

long start = System.currentTimeMillis();

System.gc();

System.out.println("starting calculation");

System.out.print("calculating X = WT * V... ");

workingDirectory = UpdateWHStep1.runJob(numMappers, numReducers, replication,

UpdateWHStep1.UPDATE_TYPE_H, vDirectory, wDirectory, outputDir, k);

resultDirectoryX = UpdateWHStep2.runJob(numMappers, numReducers, replication,

workingDirectory, outputDir);

FileSystem.get(mainJob).delete(new Path(workingDirectory));

System.out.println("done");

System.out.print("calculating Y = WT * W * H... ");

workingDirectory = UpdateWHStep3.runJob(numMappers, numReducers, replication,

wDirectory, outputDir);

resultDirectoryY = UpdateWHStep4.runJob(numMappers, replication, workingDirectory,

UpdateWHStep4.UPDATE_TYPE_H, hDirectory, outputDir);

FileSystem.get(mainJob).delete(new Path(workingDirectory));

System.out.println("done");

System.out.print("calculating H = H .* X ./ Y... ");

workingDirectory = UpdateWHStep5.runJob(numMappers, numReducers, replication,

hDirectory, resultDirectoryX, resultDirectoryY, hFinalDir, k);

System.out.println("done");

FileSystem.get(mainJob).delete(new Path(resultDirectoryX));

FileSystem.get(mainJob).delete(new Path(resultDirectoryY));

System.out.print("storing back H... ");

FileSystem.get(mainJob).delete(new Path(hDirectory));

hDirectory = workingDirectory;

System.out.println("done");

System.out.print("calculating X = V * HT... ");

workingDirectory = UpdateWHStep1.runJob(numMappers, numReducers, replication,

UpdateWHStep1.UPDATE_TYPE_W, vDirectory, hDirectory, outputDir, k);

resultDirectoryX = UpdateWHStep2.runJob(numMappers, numReducers, replication,

workingDirectory, outputDir);

FileSystem.get(mainJob).delete(new Path(workingDirectory));

System.out.println("done");

System.out.print("calculating Y = W * H * HT... ");

workingDirectory = UpdateWHStep3.runJob(numMappers, numReducers, replication,

hDirectory, outputDir);

resultDirectoryY = UpdateWHStep4.runJob(numMappers, replication, workingDirectory,

UpdateWHStep4.UPDATE_TYPE_W, wDirectory, outputDir);

FileSystem.get(mainJob).delete(new Path(workingDirectory));

System.out.println("done");

System.out.print("calculating W = W .* X ./ Y... ");

workingDirectory = UpdateWHStep5.runJob(numMappers, numReducers, replication,

wDirectory, resultDirectoryX, resultDirectoryY, wFinalDir, k);

System.out.println("done");

FileSystem.get(mainJob).delete(new Path(resultDirectoryX));

FileSystem.get(mainJob).delete(new Path(resultDirectoryY));

System.out.print("storing back W... ");

FileSystem.get(mainJob).delete(new Path(wDirectory));

wDirectory = workingDirectory;

System.out.println("done");

long requiredTime = System.currentTimeMillis() - start;

long requiredTimeMilliseconds = requiredTime % 1000;

requiredTime -= requiredTimeMilliseconds;

requiredTime /= 1000;

long requiredTimeSeconds = requiredTime % 60;

requiredTime -= requiredTimeSeconds; requiredTime /= 60;

long requiredTimeMinutes = requiredTime % 60;

requiredTime -= requiredTimeMinutes;

requiredTime /= 60;

long requiredTimeHours = requiredTime;

}

}

package gnmf;

import gnmf.io.MatrixObject;

import gnmf.io.MatrixVector;

import gnmf.io.TaggedIndex;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.SequenceFileInputFormat;

import org.apache.hadoop.mapred.SequenceFileOutputFormat;

public class UpdateWHStep2

{

static class UpdateWHStep2Mapper extends MapReduceBase

implements Mapper<TaggedIndex, MatrixVector, TaggedIndex, MatrixVector>

{

@Override

public void map(TaggedIndex key, MatrixVector value,

OutputCollector<TaggedIndex, MatrixVector> out,

Reporter reporter) throws IOException

{

out.collect(key, value);

}

}

static class UpdateWHStep2Reducer extends MapReduceBase

implements Reducer<TaggedIndex, MatrixVector, TaggedIndex, MatrixObject>

{

@Override

public void reduce(TaggedIndex key, Iterator<MatrixVector> values,

OutputCollector<TaggedIndex, MatrixObject> out, Reporter reporter)

throws IOException

{

MatrixVector result = null;

while(values.hasNext())

{

MatrixVector current = values.next();

if(result == null)

{

result = current.getCopy();

} else

{

result.addVector(current);

}

}

if(result != null)

{

out.collect(new TaggedIndex(key.getIndex(), TaggedIndex.TYPE_VECTOR_X),

new MatrixObject(result));

}

}

}

public static String runJob(int numMappers, int numReducers, int replication,

String inputDir, String outputDir) throws IOException

{

String workingDirectory = outputDir + System.currentTimeMillis() + "-

UpdateWHStep2/";

JobConf job = new JobConf(UpdateWHStep2.class);

job.setJobName("MatrixGNMFUpdateWHStep2");

job.setInputFormat(SequenceFileInputFormat.class);

FileInputFormat.setInputPaths(job, new Path(inputDir));

job.setOutputFormat(SequenceFileOutputFormat.class);

FileOutputFormat.setOutputPath(job, new Path(workingDirectory));

job.setNumMapTasks(numMappers);

job.setMapperClass(UpdateWHStep2Mapper.class);

job.setMapOutputKeyClass(TaggedIndex.class);

job.setMapOutputValueClass(MatrixVector.class);

job.setNumReduceTasks(numReducers);

job.setReducerClass(UpdateWHStep2Reducer.class); job.setOutputKeyClass(TaggedIndex.class);

job.setOutputValueClass(MatrixObject.class);

JobClient.runJob(job);

return workingDirectory;

}

}

package gnmf;

import gnmf.io.MatrixCell;

import gnmf.io.MatrixFormats;

import gnmf.io.MatrixObject;

import gnmf.io.MatrixVector;

import gnmf.io.TaggedIndex;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.SequenceFileInputFormat;

import org.apache.hadoop.mapred.SequenceFileOutputFormat;

public class UpdateWHStep1

{

public static final int UPDATE_TYPE_H = 0;

public static final int UPDATE_TYPE_W = 1;

static class UpdateWHStep1Mapper extends MapReduceBase

implements Mapper<TaggedIndex, MatrixObject, TaggedIndex, MatrixObject>

{

private int updateType;

@Override

public void map(TaggedIndex key, MatrixObject value,

OutputCollector<TaggedIndex, MatrixObject> out,

Reporter reporter) throws IOException

{

if(updateType == UPDATE_TYPE_W && key.getType() == TaggedIndex.TYPE_CELL)

{

MatrixCell current = (MatrixCell) value.getObject();

out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_CELL),

new MatrixObject(new MatrixCell(key.getIndex(), current.getValue())));

} else

{

out.collect(key, value);

}

}

@Override

public void configure(JobConf job)

{

updateType = job.getInt("gnmf.updateType", 0);

}

}

static class UpdateWHStep1Reducer extends MapReduceBase

implements Reducer<TaggedIndex, MatrixObject, TaggedIndex, MatrixVector>

{

private double[] baseVector = null;

private int vectorSizeK;

@Override

public void reduce(TaggedIndex key, Iterator<MatrixObject> values,

OutputCollector<TaggedIndex, MatrixVector> out, Reporter reporter)

throws IOException

{

if(key.getType() == TaggedIndex.TYPE_VECTOR)

{

if(!values.hasNext())

throw new RuntimeException("expected vector");

MatrixFormats current = values.next().getObject();

if(!(current instanceof MatrixVector))

throw new RuntimeException("expected vector");

baseVector = ((MatrixVector) current).getValues();

} else

{

while(values.hasNext())

{

MatrixCell current = (MatrixCell) values.next().getObject();

if(baseVector == null)

{

out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_VECTOR),

new MatrixVector(vectorSizeK));

} else

{

if(baseVector.length == 0)

throw new RuntimeException("base vector is corrupted");

MatrixVector resultingVector = new MatrixVector(baseVector);

resultingVector.multiplyWithScalar(current.getValue());

if(resultingVector.getValues().length == 0)

throw new RuntimeException("multiplying with scalar failed");

out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_VECTOR),

resultingVector);

}

}

baseVector = null;

}

}

@Override

public void configure(JobConf job)

{

vectorSizeK = job.getInt("dml.matrix.gnmf.k", 0);

if(vectorSizeK == 0)

throw new RuntimeException("invalid k specified");

}

}

public static String runJob(int numMappers, int numReducers, int replication,

int updateType, String matrixInputDir, String whInputDir, String outputDir,

int k) throws IOException

{

Page 8: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML lm() - Scalability and Performance

8

lm()

28x

Performance (data fit in memory)

Scalability (data larger than

aggr. memory) R out-of-memory

R 3.1.1 lm()

Cluster

• 6 nodes w/ 12 cores each

• M/R capacity: 144/72

• M/R JVM: 2 GB

Page 9: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML - Scalable Algorithms in BigInsights

9

Category Description

Descriptive Statistics

Univariate

Bivariate

Stratified Bivariate

Classification

Logistic Regression (multinomial)

Multi-Class SVM

Naïve Bayes (multinomial)

Decision Trees

Random Forest

Clustering k-Means

Regression

Linear Regression

system of equations

CG (conjugate gradient descent)

Generalized Linear

Models (GLM)

Distributions: Gaussian, Poisson, Gamma, Inverse

Gaussian, Binomial and Bernoulli

Links for all distributions: identity, log, sq. root, inverse,

1/μ2

Links for Binomial / Bernoulli: logit, probit, cloglog, cauchit

Stepwise Linear

GLM

Dimension Reduction PCA

Matrix Factorization ALS

Survival Models Kaplan Meier

Cox

Predict Scoring

Transformation Recoding, dummy coding, binning, scaling,

missing value imputation

Page 10: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

High-Level SystemML Architecture

10

IBM

Resear

Hadoop or Spark Cluster

(scale-out)

In-Memory Single Node

(scale-up)

Runtime

Compiler

Language

DML Scripts DML (Declarative Machine

Learning Language)

Page 11: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Architecture

11

Language

• R- like syntax w/ constructs for meta learning and task parallelism

• Rich set of statistical functions

• User-defined & external function

• Parsing

• Statement blocks & statements

• Program Analysis, type inference, dead code elimination

High-Level Operator (HOP) Component

• Represent dataflow in DAGs of operations on matrices, scalars

• Choosing from alternative execution plans based on time and cost

estimates: operator ordering & selection; hybrid plans

Low-Level Operator (LOP) Component

• Low-level physical execution plan (LOPDags) over key-value pairs

• “Piggybacking” operations into minimal number Map-Reduce jobs

Runtime

• Hybrid Runtime

• CP: single machine operations & orchestrate MR jobs

• MR: generic Map-Reduce jobs & operations

• SP: Spark Jobs

• Numerically stable operators

• Dense / sparse matrix representation

• Multi-Level buffer pool (caching) to evict in-memory objects

• Dynamic Recompilation for initial unknowns

Language

HOP Component

LOP Component

Runtime

DMLScript

Hadoop

R- like

syntax

Page 12: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Compilation Chain

12

Spark

CP + b sb _mVar1

SPARK mapmm X.MATRIX.DOUBLE _mvar1.MATRIX.DOUBLE

_mVar2.MATRIX.DOUBLE RIGHT false NONE

CP * y _mVar2 _mVar3

Page 13: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Selected Algebraic Simplification Rewrites

13

Name Pattern

Remove Unnecessary

Indexing

X[a:b,c:d] = Y X = Y iff dims(X)=dims(Y)

X = Y[, 1] X = Y iff ncol(Y)=1

Remove Empty

Matrix Multiply

X%*%Y matrix(0,nrow(X),ncol(Y))

iff nnz(X)=0|nnz(Y)=0

Removed Unnecessary

Outer Product

X*(Y%*%matrix(1,...)) X*Y

iff ncol(Y)=1

Simplify Diag Aggregates sum(diag(X))trace(X) iff ncol(X)=1

Simplify Matrix Mult Diag diag(X)%*%Y X*Y iff ncol(X)=1&ncol(Y)=1

Simplify Diag Matrix Mult diag(X%*%Y) rowSums(X*t(Y)) iff ncol(Y)>1

Simplify Dot Product Sum sum(X^2) t(X)%*%X iff ncol(X)=1

Name Static Pattern

Remove Unnecessary

Operations

t(t(X)), X/1, X*1, X-0 X matrix(1,)/X 1/X

rand(,min=-1,max=1)*7 rand(,min=-7,max=7)

Binary to Unary X+X 2*X X*X X^2 X-X*Y X*(1-Y)

Simplify Diag Aggregates trace(X%*%Y)sum(X*t(Y))

Page 14: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Example Operator Selection: Matrix Multiplication

• Physical Operators

• Hop-Lop Rewrites

Partitioning (w/o, CP/MR, colblock/rowblock)

Aggregation (w/o, singleblock/multiblock)

Transpose-MM rewrite t(X)%*%y t(t(y)%*%X)

Empty block materialization in output

CP degree of parallelism (multi-threaded mm)

14

X

r(t)

ba(+*)

y

MR

MR

t(X)%*%y Example:

Exec Type Physical MM Operators

CP MM

MMChain

TSMM

PMM

MR / Spark MapMM

MapMMChain

TSMM (transpose-self mm)

PMM (permutation mm)

CPMM (cross-product mm)

RMM (replication mm)

Zipmm (partition aware mm)

MapMM

(MR,left)

Transform

(CP,’)

Partition

(CP,col)

Transform

(CP,’)

X y

Aggregate

(MR,ak+)

Group

(MR)

Page 15: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML compiles hybrid runtime plans ranging from in-memory, single machine (CP) to large-scale, cluster compute • Example

• Challenge

Guaranteed hard memory constraints (budget of JVM size) for arbitrary complex ML programs

• Key Technical Innovations

CP & distributed runtime: Single machine & distributed operations, integrated runtime

Caching: Reuse and eviction of in-memory objects (buffer pool)

Cost Model: Accurate time and worst-case memory estimates

Optimizer: Rewrites and cost-based runtime plan generation

Dynamic Recompiler: Re-optimization for initial unknowns 15

Data size

Runtim

e

CP CP/Cluster Cluster

Gradually exploit

cluster parallelism

High performance

computing for

small data sizes.

Scalable

computing for

large data sizes. Hybrid Plans

tokens docum

ents

1 1 0.10

1 2 0.30

1 3 0.22

1 4 1.24

: : :

: : :

W

H

K topic

s words K topics

docum

ents

1 1 0.10

1 2 0.30

: : :

V

Page 16: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Compilation of Execution plan for bigr.lm()

16

1 MR Job

In-Mem Master

+ + +

M M M …

R

Mappers compute

• X’X for each block in X

Combiners partial aggr.

intermediate blocks

Single reducer for final

aggregation as only 1

result block

Compute b, and execute

solve(A, b) on small A, b

(<2 MB)

A= b’

X=

(automatic, internal

matrix block

representation) …

1k

1k

300M

500 X

4 TB text file

300M

1 y

9 GB text file

Data Characteristics

3.5 GB Map Task JVM

7 GB In-Mem Master JVM

128 MB HDFS block size

Cluster Configuration

Hadoop

distributed

cache

y’

• y’X for each block in X,

because X’y rewr to (y’X)’

How will execution plan change, if changes in

• Data characteristics

• More columns and rows

• Less columns and rows

• Cluster characteristics

• Smaller task JVM size

beta=

Page 17: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Different Execution Plans for bigr.lm, if …

17

300M

1500 X

300M

1 y 3.5 GB Map Task JVM

7 GB In-Mem Master JVM

Data: X has 3 times more columns

300M

500 X

300M

1 y 1.5 GB Map Task JVM

7 GB In-Mem Master JVM

Cluster: Change in Cluster configuration

1M

100 X

1M

1 y 3.5 GB Map Task JVM

7 GB In-Mem Master JVM

Data: X is small and fits in memory

600M

500 X

600M

1 y 3.5 GB Map Task JVM

7 GB In-Mem Master JVM

Data: X has 2 times more rows

X’X job1

X’X job2

X’y job

solve

X’y job1

X’y job2

X’X job

solve

Solve X’X X’y

X’y job1

X’y job2

X’X job

solve

300M

500 X

300M

1 y

Data Characteristics

3.5 GB Map Task JVM

7 GB In-Mem Master JVM

Cluster Configuration

Data:

X’X and X’y job

solve

Page 18: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Engine Key Components

• Compiler

Language parser (~25 KLoC Java)

• Parsing, live variable analysis, semantic analysis

Optimizer (~35 KLoC Java)

• Hops, Lops

• Rewrites, intra/inter-procedural analysis, memory estimates, cost model, operator selection

• Parfor Optimizer, resource optimizer, global data flow opt.

• Execution plan generation

• Runtime (~70 KLoC Java)

Runtime instructions

Core runtime operations

Buffer pool and IO

Dynamic recompilation

UDF framework

YARN integration (SystemML AM)

18

Page 19: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Some Observations on SystemML with Spark (1/2)

• Richer Spark Core API significantly simplified implementation

• Symbol table tracks matrix runtime data either as

single (large) MatrixBlock that is kept in driver JVM,

• Used for single node instructions

• Backed by multi-level cache

or as distributed collection of MatrixBlocks in cluster

• JavaPairRDD<MatrixIndexes, MatrixBlock>

• Used for distributed Spark instructions

• Subject to lazy evaluation

• If beneficial, cache RDD

– Before loops in iterative algorithms, if read only

– Storage level: MEMORY_AND_DISK

• Spark’s narrow dependency provides LOP piggybacking

but problems with multiple consumers w/ individual actions (multiple scans)

19

Page 20: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Some Observations on Spark with Spark (2/2)

• Robust handling of broadcast variables from driver

Observe memory constraints

Broadcast partitioned matrix blocks for efficiency

• Preserve input RDD’s partitioning whenever possible to avoid shuffle

e.g., matrix-vector binary operations using mapPartitions in combination with broadcast

• Optimize degree of parallelism by shuffling if necessary

e.g. coalesce RDDs before loops, taking into account the metadata information of data involved

• Reduce overhead of Spark framework whenever possible for small-medium datasets

Example: Lazy SparkContext

20

Page 21: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Performance SystemML Spark Backend

In-Memory

Data Set

(160 GB)

Large-Scale

Data Set

(1.6 TB)

5.1x 1.4x 6.4x 9.7x

0.8x 1.3x 1.9x 1.9x

Page 22: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Spark MLContext

• Fit into Spark APIs, consume and produce DataFrames

• Exploit SystemML’s compiler to produce execution plans with Spark backend.

• Useable from Scala, Java, Python, R/SparkR

22

Page 23: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

Run SystemML in ML Pipeline

23

Page 24: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

BigR Interface for SystemML

Connect to BI cluster

Data frame proxy to large data file

Data transformation step

Run scalable linear regression on cluster

24

Page 25: Alpine Tech Talk: System ML by Berthold Reinwald

© 2015 IBM Corporation

SystemML Scalable Machine Learning - Summary

• Cost-based compilation of machine learning algorithms generates execution plans

for single-node in-memory, cluster, and hybrid execution

for varying data characteristics: • varying number of observations (1,000s to 10s of billions) • varying number of variables (10s to 10s of millions) • dense and sparse data

for varying cluster characteristics (memory configurations, degree of parallelism)

• Out-of-the-box, scalable machine learning algorithms

e.g. descriptive statistics, regression, clustering, and classification

• "Roll-your-own" algorithms

Enable programmer productivity (no worry about scalability, numeric stability, and optimizations)

Fast turn-around for new algorithms

• Machine-learning specific language constructs such as ensemble learning and cross-validation

• Higher-level language shields algorithm development investment from platform progression

Yarn for resource negotiation and elasticity Spark for in-memory, iterative processing

• Open Source Commitment: R, Spark, Hadoop, etc.

Platform to build, customize and run pre-processing, feature engineering, and machine learning algorithms in R-like syntax.