alpine tech talk: system ml by berthold reinwald
TRANSCRIPT
© 2015 IBM Corporation
SystemML Scalable Machine Learning SystemML Team, IBM Almaden – Research
Presented by: Berthold Reinwald, Technical Lead
[email protected] September, 2015
© 2015 IBM Corporation
SystemML in IBM BigInsights Data Scientist
• Fruition of 5+ years research project
• Overview
SystemML engine
Broad class of scalable algorithms
Stand alone, MapReduce, Spark
Open Source
• Some Stats
Proof of Concepts with customers
8 technical research publications
2
BigInsights Data Scientist module: Accelerate data science teams with advanced
analytics to extract valuable insights from Hadoop
• Big R: Statistical analysis & distributed frames using entire Hadoop cluster
• Machine Learning: Scalable algorithms
• Text Analytics: Text extraction via business web tooling
© 2015 IBM Corporation
More Information
• Sebastian Schelter, Juan Soto, Volker Markl, Douglas Burdick, Berthold Reinwald, Alexandre V. Evfimievski: Efficient sample generation for scalable meta learning. ICDE 2015: 1191-1202
• Arash Ashari, Shirish Tatikonda, Matthias Boehm, Berthold Reinwald, Keith Campbell, John Keenleyside, P. Sadayappan: On optimizing machine learning workloads via kernel fusion. PPOPP 2015: 173-182
• Botong Huang, Matthias Boehm, Yuanyuan Tian, Berthold Reinwald, Shirish Tatikonda, Frederick R. Reiss: Resource Elasticity for Large-Scale Machine Learning. SIGMOD Conference 2015: 137-152
• Matthias Boehm, Douglas R. Burdick, Alexandre V. Evfimievski, Berthold Reinwald, Frederick R. Reiss, Prithviraj Sen, Shirish Tatikonda, Yuanyuan Tian: SystemML's Optimizer: Plan Generation for Large-Scale Machine Learning Programs. IEEE Data Eng. Bull. 37(3): 52-62 (2014)
• Matthias Boehm, Shirish Tatikonda, Berthold Reinwald, Prithviraj Sen, Yuanyuan Tian, Douglas Burdick, Shivakumar Vaithyanathan: Hybrid Parallelization Strategies for Large-Scale Machine Learning in SystemML. PVLDB 7(7): 553-564 (2014)
• Peter D. Kirchner, Matthias Boehm, Berthold Reinwald, Daby M. Sow, Michael Schmidt, Deepak S. Turaga, Alain Biem: Large Scale Discriminative Metric Learning. IPDPS Workshops 2014: 1656-1663
• Yuanyuan Tian, Shirish Tatikonda, Berthold Reinwald: Scalable and Numerically Stable Descriptive Statistics in SystemML. ICDE 2012: 1351-1359
• Amol Ghoting, Rajasekar Krishnamurthy, Edwin P. D. Pednault, Berthold Reinwald, Vikas Sindhwani, Shirish Tatikonda, Yuanyuan Tian, Shivakumar Vaithyanathan: SystemML: Declarative machine learning on MapReduce. ICDE 2011: 231-242
3
Algorithm
Optimizer
Resource
Elasticity
GPU
Sampling
Numeric
Stability
Task
Parallelism
1st paper
© 2015 IBM Corporation
SystemML Open Source announced in June 2015
4
© 2015 IBM Corporation
Big Data Analytics Usecases • Insurance
Problem Description
• optimal subset of features that leads to the best
regression model
Problem Size
• 1.1M observations, 95 features, Subsets of 15 variables
Algorithm
• Parallelization of independent model building
• Automotive
Problem Description
• Customer Satisfaction
Problem Size
• 2 mill cars with 8,000 reacquired cars, 10 mill repair
cases, 25 mill parts exchanges
Algorithms
• Logistic regression using ~22k feature variables
• Increasing the #features from ~250 to ~21,800,
improved precision/recall by order of magnitude
• Sequence mining using very low support value
• Very large number of intermediate result sequences.
• Air Transportation
Problem Description
• Predict passenger volumes at locations in an airport
Problem Size
• WiFi data with ~66 M rows for ~1.3 M MAC addr.
Algorithms
• Multiple models per location, per passenger type
• Time-series analysis using seasonal and non-seasonal
auto-regressive, moving average components along with
differencing operations (Arima and Holt-Winters triple
exponential smoothing)
• Financial Services
Problem Description
• Compute correlations between Financial Analysts’
performance metrics and sentiments extracted from
surveys submitted by them
Algorithms
• Descriptive (Bivariate) Statistics: Chi-squared test,
Spearman’s Rho, Gamma, Kendall’s Tau-B, Odds-Ratio
test, F-test (stratified and unstratified)
• Retail Banking
Problem Description
• Use statistical analysis on social media data linked to the
bank’s data to identify customer segments of interest, find
predictors of purchase intent, and gauge sentiment
towards bank’s products.
Algorithms
• Bivariate odds ratios and binomial proportions with
confidence intervals
• Services Company
Problem
• Compute a benchmark index by mapping producers’
financial reports into a normalized schema, using analytics
to extrapolate missing reports and/or impute missing
values.
Algorithms
• Regularized least-squares loss minimization and Gibbs
sampling (MCMC) jointly over the parameter space and
over the missing (estimated) values
5
More
• PCA on 7k attributes at rail road company
• GLM on 10B rows at insurance company
© 2015 IBM Corporation
SystemML
• Algorithms expressed in a declarative, high-level language with R-like syntax
• Cost-based compilation of algorithms to generate execution plans
Compilation and parallelization
• Based on data characteristics
• Based on cluster and machine characteristics
In-Memory single node and cluster execution
• Enable algorithm developer productivity to build additional algorithms (scalability, numeric stability and optimizations)
6
Linear Regression
© 2015 IBM Corporation
Example: Gaussian Non-negative Matrix Factorization
7
package gnmf;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
public class MatrixGNMF
{
public static void main(String[] args) throws IOException, URISyntaxException
{
if(args.length < 10)
{
System.out.println("missing parameters");
System.out.println("expected parameters: [directory of v] [directory of w] [directory of h] " +
"[k] [num mappers] [num reducers] [replication] [working directory] " +
"[final directory of w] [final directory of h]");
System.exit(1);
}
String vDir = args[0];
String wDir = args[1];
String hDir = args[2];
int k = Integer.parseInt(args[3]);
int numMappers = Integer.parseInt(args[4]);
int numReducers = Integer.parseInt(args[5]);
int replication = Integer.parseInt(args[6]);
String outputDir = args[7];
String wFinalDir = args[8];
String hFinalDir = args[9];
JobConf mainJob = new JobConf(MatrixGNMF.class);
String vDirectory;
String wDirectory;
String hDirectory;
FileSystem.get(mainJob).delete(new Path(outputDir));
vDirectory = vDir;
hDirectory = hDir;
wDirectory = wDir;
String workingDirectory;
String resultDirectoryX;
String resultDirectoryY;
long start = System.currentTimeMillis();
System.gc();
System.out.println("starting calculation");
System.out.print("calculating X = WT * V... ");
workingDirectory = UpdateWHStep1.runJob(numMappers, numReducers, replication,
UpdateWHStep1.UPDATE_TYPE_H, vDirectory, wDirectory, outputDir, k);
resultDirectoryX = UpdateWHStep2.runJob(numMappers, numReducers, replication,
workingDirectory, outputDir);
FileSystem.get(mainJob).delete(new Path(workingDirectory));
System.out.println("done");
System.out.print("calculating Y = WT * W * H... ");
workingDirectory = UpdateWHStep3.runJob(numMappers, numReducers, replication,
wDirectory, outputDir);
resultDirectoryY = UpdateWHStep4.runJob(numMappers, replication, workingDirectory,
UpdateWHStep4.UPDATE_TYPE_H, hDirectory, outputDir);
FileSystem.get(mainJob).delete(new Path(workingDirectory));
System.out.println("done");
System.out.print("calculating H = H .* X ./ Y... ");
workingDirectory = UpdateWHStep5.runJob(numMappers, numReducers, replication,
hDirectory, resultDirectoryX, resultDirectoryY, hFinalDir, k);
System.out.println("done");
FileSystem.get(mainJob).delete(new Path(resultDirectoryX));
FileSystem.get(mainJob).delete(new Path(resultDirectoryY));
System.out.print("storing back H... ");
FileSystem.get(mainJob).delete(new Path(hDirectory));
hDirectory = workingDirectory;
System.out.println("done");
System.out.print("calculating X = V * HT... ");
workingDirectory = UpdateWHStep1.runJob(numMappers, numReducers, replication,
UpdateWHStep1.UPDATE_TYPE_W, vDirectory, hDirectory, outputDir, k);
resultDirectoryX = UpdateWHStep2.runJob(numMappers, numReducers, replication,
workingDirectory, outputDir);
FileSystem.get(mainJob).delete(new Path(workingDirectory));
System.out.println("done");
System.out.print("calculating Y = W * H * HT... ");
workingDirectory = UpdateWHStep3.runJob(numMappers, numReducers, replication,
hDirectory, outputDir);
resultDirectoryY = UpdateWHStep4.runJob(numMappers, replication, workingDirectory,
UpdateWHStep4.UPDATE_TYPE_W, wDirectory, outputDir);
FileSystem.get(mainJob).delete(new Path(workingDirectory));
System.out.println("done");
System.out.print("calculating W = W .* X ./ Y... ");
workingDirectory = UpdateWHStep5.runJob(numMappers, numReducers, replication,
wDirectory, resultDirectoryX, resultDirectoryY, wFinalDir, k);
System.out.println("done");
FileSystem.get(mainJob).delete(new Path(resultDirectoryX));
FileSystem.get(mainJob).delete(new Path(resultDirectoryY));
System.out.print("storing back W... ");
FileSystem.get(mainJob).delete(new Path(wDirectory));
wDirectory = workingDirectory;
System.out.println("done");
long requiredTime = System.currentTimeMillis() - start;
long requiredTimeMilliseconds = requiredTime % 1000;
requiredTime -= requiredTimeMilliseconds;
requiredTime /= 1000;
long requiredTimeSeconds = requiredTime % 60;
requiredTime -= requiredTimeSeconds; requiredTime /= 60;
long requiredTimeMinutes = requiredTime % 60;
requiredTime -= requiredTimeMinutes;
requiredTime /= 60;
long requiredTimeHours = requiredTime;
}
}
package gnmf;
import gnmf.io.MatrixObject;
import gnmf.io.MatrixVector;
import gnmf.io.TaggedIndex;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
public class UpdateWHStep2
{
static class UpdateWHStep2Mapper extends MapReduceBase
implements Mapper<TaggedIndex, MatrixVector, TaggedIndex, MatrixVector>
{
@Override
public void map(TaggedIndex key, MatrixVector value,
OutputCollector<TaggedIndex, MatrixVector> out,
Reporter reporter) throws IOException
{
out.collect(key, value);
}
}
static class UpdateWHStep2Reducer extends MapReduceBase
implements Reducer<TaggedIndex, MatrixVector, TaggedIndex, MatrixObject>
{
@Override
public void reduce(TaggedIndex key, Iterator<MatrixVector> values,
OutputCollector<TaggedIndex, MatrixObject> out, Reporter reporter)
throws IOException
{
MatrixVector result = null;
while(values.hasNext())
{
MatrixVector current = values.next();
if(result == null)
{
result = current.getCopy();
} else
{
result.addVector(current);
}
}
if(result != null)
{
out.collect(new TaggedIndex(key.getIndex(), TaggedIndex.TYPE_VECTOR_X),
new MatrixObject(result));
}
}
}
public static String runJob(int numMappers, int numReducers, int replication,
String inputDir, String outputDir) throws IOException
{
String workingDirectory = outputDir + System.currentTimeMillis() + "-
UpdateWHStep2/";
JobConf job = new JobConf(UpdateWHStep2.class);
job.setJobName("MatrixGNMFUpdateWHStep2");
job.setInputFormat(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(inputDir));
job.setOutputFormat(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(workingDirectory));
job.setNumMapTasks(numMappers);
job.setMapperClass(UpdateWHStep2Mapper.class);
job.setMapOutputKeyClass(TaggedIndex.class);
job.setMapOutputValueClass(MatrixVector.class);
job.setNumReduceTasks(numReducers);
job.setReducerClass(UpdateWHStep2Reducer.class); job.setOutputKeyClass(TaggedIndex.class);
job.setOutputValueClass(MatrixObject.class);
JobClient.runJob(job);
return workingDirectory;
}
}
package gnmf;
import gnmf.io.MatrixCell;
import gnmf.io.MatrixFormats;
import gnmf.io.MatrixObject;
import gnmf.io.MatrixVector;
import gnmf.io.TaggedIndex;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
public class UpdateWHStep1
{
public static final int UPDATE_TYPE_H = 0;
public static final int UPDATE_TYPE_W = 1;
static class UpdateWHStep1Mapper extends MapReduceBase
implements Mapper<TaggedIndex, MatrixObject, TaggedIndex, MatrixObject>
{
private int updateType;
@Override
public void map(TaggedIndex key, MatrixObject value,
OutputCollector<TaggedIndex, MatrixObject> out,
Reporter reporter) throws IOException
{
if(updateType == UPDATE_TYPE_W && key.getType() == TaggedIndex.TYPE_CELL)
{
MatrixCell current = (MatrixCell) value.getObject();
out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_CELL),
new MatrixObject(new MatrixCell(key.getIndex(), current.getValue())));
} else
{
out.collect(key, value);
}
}
@Override
public void configure(JobConf job)
{
updateType = job.getInt("gnmf.updateType", 0);
}
}
static class UpdateWHStep1Reducer extends MapReduceBase
implements Reducer<TaggedIndex, MatrixObject, TaggedIndex, MatrixVector>
{
private double[] baseVector = null;
private int vectorSizeK;
@Override
public void reduce(TaggedIndex key, Iterator<MatrixObject> values,
OutputCollector<TaggedIndex, MatrixVector> out, Reporter reporter)
throws IOException
{
if(key.getType() == TaggedIndex.TYPE_VECTOR)
{
if(!values.hasNext())
throw new RuntimeException("expected vector");
MatrixFormats current = values.next().getObject();
if(!(current instanceof MatrixVector))
throw new RuntimeException("expected vector");
baseVector = ((MatrixVector) current).getValues();
} else
{
while(values.hasNext())
{
MatrixCell current = (MatrixCell) values.next().getObject();
if(baseVector == null)
{
out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_VECTOR),
new MatrixVector(vectorSizeK));
} else
{
if(baseVector.length == 0)
throw new RuntimeException("base vector is corrupted");
MatrixVector resultingVector = new MatrixVector(baseVector);
resultingVector.multiplyWithScalar(current.getValue());
if(resultingVector.getValues().length == 0)
throw new RuntimeException("multiplying with scalar failed");
out.collect(new TaggedIndex(current.getColumn(), TaggedIndex.TYPE_VECTOR),
resultingVector);
}
}
baseVector = null;
}
}
@Override
public void configure(JobConf job)
{
vectorSizeK = job.getInt("dml.matrix.gnmf.k", 0);
if(vectorSizeK == 0)
throw new RuntimeException("invalid k specified");
}
}
public static String runJob(int numMappers, int numReducers, int replication,
int updateType, String matrixInputDir, String whInputDir, String outputDir,
int k) throws IOException
{
© 2015 IBM Corporation
SystemML lm() - Scalability and Performance
8
lm()
28x
Performance (data fit in memory)
Scalability (data larger than
aggr. memory) R out-of-memory
R 3.1.1 lm()
Cluster
• 6 nodes w/ 12 cores each
• M/R capacity: 144/72
• M/R JVM: 2 GB
© 2015 IBM Corporation
SystemML - Scalable Algorithms in BigInsights
9
Category Description
Descriptive Statistics
Univariate
Bivariate
Stratified Bivariate
Classification
Logistic Regression (multinomial)
Multi-Class SVM
Naïve Bayes (multinomial)
Decision Trees
Random Forest
Clustering k-Means
Regression
Linear Regression
system of equations
CG (conjugate gradient descent)
Generalized Linear
Models (GLM)
Distributions: Gaussian, Poisson, Gamma, Inverse
Gaussian, Binomial and Bernoulli
Links for all distributions: identity, log, sq. root, inverse,
1/μ2
Links for Binomial / Bernoulli: logit, probit, cloglog, cauchit
Stepwise Linear
GLM
Dimension Reduction PCA
Matrix Factorization ALS
Survival Models Kaplan Meier
Cox
Predict Scoring
Transformation Recoding, dummy coding, binning, scaling,
missing value imputation
© 2015 IBM Corporation
High-Level SystemML Architecture
10
IBM
Resear
Hadoop or Spark Cluster
(scale-out)
In-Memory Single Node
(scale-up)
Runtime
Compiler
Language
DML Scripts DML (Declarative Machine
Learning Language)
© 2015 IBM Corporation
SystemML Architecture
11
Language
• R- like syntax w/ constructs for meta learning and task parallelism
• Rich set of statistical functions
• User-defined & external function
• Parsing
• Statement blocks & statements
• Program Analysis, type inference, dead code elimination
High-Level Operator (HOP) Component
• Represent dataflow in DAGs of operations on matrices, scalars
• Choosing from alternative execution plans based on time and cost
estimates: operator ordering & selection; hybrid plans
Low-Level Operator (LOP) Component
• Low-level physical execution plan (LOPDags) over key-value pairs
• “Piggybacking” operations into minimal number Map-Reduce jobs
Runtime
• Hybrid Runtime
• CP: single machine operations & orchestrate MR jobs
• MR: generic Map-Reduce jobs & operations
• SP: Spark Jobs
• Numerically stable operators
• Dense / sparse matrix representation
• Multi-Level buffer pool (caching) to evict in-memory objects
• Dynamic Recompilation for initial unknowns
Language
HOP Component
LOP Component
Runtime
DMLScript
Hadoop
R- like
syntax
© 2015 IBM Corporation
SystemML Compilation Chain
12
Spark
CP + b sb _mVar1
SPARK mapmm X.MATRIX.DOUBLE _mvar1.MATRIX.DOUBLE
_mVar2.MATRIX.DOUBLE RIGHT false NONE
CP * y _mVar2 _mVar3
© 2015 IBM Corporation
Selected Algebraic Simplification Rewrites
13
Name Pattern
Remove Unnecessary
Indexing
X[a:b,c:d] = Y X = Y iff dims(X)=dims(Y)
X = Y[, 1] X = Y iff ncol(Y)=1
Remove Empty
Matrix Multiply
X%*%Y matrix(0,nrow(X),ncol(Y))
iff nnz(X)=0|nnz(Y)=0
Removed Unnecessary
Outer Product
X*(Y%*%matrix(1,...)) X*Y
iff ncol(Y)=1
Simplify Diag Aggregates sum(diag(X))trace(X) iff ncol(X)=1
Simplify Matrix Mult Diag diag(X)%*%Y X*Y iff ncol(X)=1&ncol(Y)=1
Simplify Diag Matrix Mult diag(X%*%Y) rowSums(X*t(Y)) iff ncol(Y)>1
Simplify Dot Product Sum sum(X^2) t(X)%*%X iff ncol(X)=1
Name Static Pattern
Remove Unnecessary
Operations
t(t(X)), X/1, X*1, X-0 X matrix(1,)/X 1/X
rand(,min=-1,max=1)*7 rand(,min=-7,max=7)
Binary to Unary X+X 2*X X*X X^2 X-X*Y X*(1-Y)
Simplify Diag Aggregates trace(X%*%Y)sum(X*t(Y))
© 2015 IBM Corporation
Example Operator Selection: Matrix Multiplication
• Physical Operators
• Hop-Lop Rewrites
Partitioning (w/o, CP/MR, colblock/rowblock)
Aggregation (w/o, singleblock/multiblock)
Transpose-MM rewrite t(X)%*%y t(t(y)%*%X)
Empty block materialization in output
CP degree of parallelism (multi-threaded mm)
14
X
r(t)
ba(+*)
y
MR
MR
t(X)%*%y Example:
Exec Type Physical MM Operators
CP MM
MMChain
TSMM
PMM
MR / Spark MapMM
MapMMChain
TSMM (transpose-self mm)
PMM (permutation mm)
CPMM (cross-product mm)
RMM (replication mm)
Zipmm (partition aware mm)
MapMM
(MR,left)
Transform
(CP,’)
Partition
(CP,col)
Transform
(CP,’)
X y
Aggregate
(MR,ak+)
Group
(MR)
© 2015 IBM Corporation
SystemML compiles hybrid runtime plans ranging from in-memory, single machine (CP) to large-scale, cluster compute • Example
• Challenge
Guaranteed hard memory constraints (budget of JVM size) for arbitrary complex ML programs
• Key Technical Innovations
CP & distributed runtime: Single machine & distributed operations, integrated runtime
Caching: Reuse and eviction of in-memory objects (buffer pool)
Cost Model: Accurate time and worst-case memory estimates
Optimizer: Rewrites and cost-based runtime plan generation
Dynamic Recompiler: Re-optimization for initial unknowns 15
Data size
Runtim
e
CP CP/Cluster Cluster
Gradually exploit
cluster parallelism
High performance
computing for
small data sizes.
Scalable
computing for
large data sizes. Hybrid Plans
tokens docum
ents
1 1 0.10
1 2 0.30
1 3 0.22
1 4 1.24
: : :
: : :
W
H
K topic
s words K topics
docum
ents
1 1 0.10
1 2 0.30
: : :
V
© 2015 IBM Corporation
Compilation of Execution plan for bigr.lm()
16
1 MR Job
In-Mem Master
+ + +
M M M …
R
Mappers compute
• X’X for each block in X
Combiners partial aggr.
intermediate blocks
Single reducer for final
aggregation as only 1
result block
Compute b, and execute
solve(A, b) on small A, b
(<2 MB)
A= b’
X=
(automatic, internal
matrix block
representation) …
1k
1k
300M
500 X
4 TB text file
300M
1 y
9 GB text file
Data Characteristics
3.5 GB Map Task JVM
7 GB In-Mem Master JVM
128 MB HDFS block size
Cluster Configuration
Hadoop
distributed
cache
y’
• y’X for each block in X,
because X’y rewr to (y’X)’
How will execution plan change, if changes in
• Data characteristics
• More columns and rows
• Less columns and rows
• Cluster characteristics
• Smaller task JVM size
beta=
© 2015 IBM Corporation
Different Execution Plans for bigr.lm, if …
17
300M
1500 X
300M
1 y 3.5 GB Map Task JVM
7 GB In-Mem Master JVM
Data: X has 3 times more columns
300M
500 X
300M
1 y 1.5 GB Map Task JVM
7 GB In-Mem Master JVM
Cluster: Change in Cluster configuration
1M
100 X
1M
1 y 3.5 GB Map Task JVM
7 GB In-Mem Master JVM
Data: X is small and fits in memory
600M
500 X
600M
1 y 3.5 GB Map Task JVM
7 GB In-Mem Master JVM
Data: X has 2 times more rows
X’X job1
X’X job2
X’y job
solve
X’y job1
X’y job2
X’X job
solve
Solve X’X X’y
X’y job1
X’y job2
X’X job
solve
300M
500 X
300M
1 y
Data Characteristics
3.5 GB Map Task JVM
7 GB In-Mem Master JVM
Cluster Configuration
Data:
X’X and X’y job
solve
© 2015 IBM Corporation
SystemML Engine Key Components
• Compiler
Language parser (~25 KLoC Java)
• Parsing, live variable analysis, semantic analysis
Optimizer (~35 KLoC Java)
• Hops, Lops
• Rewrites, intra/inter-procedural analysis, memory estimates, cost model, operator selection
• Parfor Optimizer, resource optimizer, global data flow opt.
• Execution plan generation
• Runtime (~70 KLoC Java)
Runtime instructions
Core runtime operations
Buffer pool and IO
Dynamic recompilation
UDF framework
YARN integration (SystemML AM)
18
© 2015 IBM Corporation
Some Observations on SystemML with Spark (1/2)
• Richer Spark Core API significantly simplified implementation
• Symbol table tracks matrix runtime data either as
single (large) MatrixBlock that is kept in driver JVM,
• Used for single node instructions
• Backed by multi-level cache
or as distributed collection of MatrixBlocks in cluster
• JavaPairRDD<MatrixIndexes, MatrixBlock>
• Used for distributed Spark instructions
• Subject to lazy evaluation
• If beneficial, cache RDD
– Before loops in iterative algorithms, if read only
– Storage level: MEMORY_AND_DISK
• Spark’s narrow dependency provides LOP piggybacking
but problems with multiple consumers w/ individual actions (multiple scans)
19
© 2015 IBM Corporation
Some Observations on Spark with Spark (2/2)
• Robust handling of broadcast variables from driver
Observe memory constraints
Broadcast partitioned matrix blocks for efficiency
• Preserve input RDD’s partitioning whenever possible to avoid shuffle
e.g., matrix-vector binary operations using mapPartitions in combination with broadcast
• Optimize degree of parallelism by shuffling if necessary
e.g. coalesce RDDs before loops, taking into account the metadata information of data involved
• Reduce overhead of Spark framework whenever possible for small-medium datasets
Example: Lazy SparkContext
20
© 2015 IBM Corporation
Performance SystemML Spark Backend
In-Memory
Data Set
(160 GB)
Large-Scale
Data Set
(1.6 TB)
5.1x 1.4x 6.4x 9.7x
0.8x 1.3x 1.9x 1.9x
© 2015 IBM Corporation
SystemML Spark MLContext
• Fit into Spark APIs, consume and produce DataFrames
• Exploit SystemML’s compiler to produce execution plans with Spark backend.
• Useable from Scala, Java, Python, R/SparkR
22
© 2015 IBM Corporation
Run SystemML in ML Pipeline
23
© 2015 IBM Corporation
BigR Interface for SystemML
Connect to BI cluster
Data frame proxy to large data file
Data transformation step
Run scalable linear regression on cluster
24
© 2015 IBM Corporation
SystemML Scalable Machine Learning - Summary
• Cost-based compilation of machine learning algorithms generates execution plans
for single-node in-memory, cluster, and hybrid execution
for varying data characteristics: • varying number of observations (1,000s to 10s of billions) • varying number of variables (10s to 10s of millions) • dense and sparse data
for varying cluster characteristics (memory configurations, degree of parallelism)
• Out-of-the-box, scalable machine learning algorithms
e.g. descriptive statistics, regression, clustering, and classification
• "Roll-your-own" algorithms
Enable programmer productivity (no worry about scalability, numeric stability, and optimizations)
Fast turn-around for new algorithms
• Machine-learning specific language constructs such as ensemble learning and cross-validation
• Higher-level language shields algorithm development investment from platform progression
Yarn for resource negotiation and elasticity Spark for in-memory, iterative processing
• Open Source Commitment: R, Spark, Hadoop, etc.
Platform to build, customize and run pre-processing, feature engineering, and machine learning algorithms in R-like syntax.