overview of apache systemml by berthold reinwald and nakul jindal
TRANSCRIPT
Agenda
• WhatisApacheSystemML• HowtoimplementSystemML algorithmsè datascientist
• HowtorunSystemML algorithmsè user
• HowdoesSystemMLworkè SystemML developer
2
WhatisApacheSystemML
• Inanutshell• alanguagefordatascientiststoimplementscalableMLalgorithms
• 2languagevariants:R-likeandPython-likesyntax• Strongfoundationoflinearalgebraoperationsandstatisticalfunctions• Comeswithapprox.20+algorithmspre-implemented
• Cost-basedoptimizertocompileexecutionplans• Dependingondatacharacteristics(tall/skinny,short/wide;dense/sparse)
andclustercharacteristics• rangingfromsinglenodetoclusters(MapReduce,Spark);hybridplans
• APIs&Tools• Commandline:hadoop jar,spark-submit,standaloneJavaapp• JMLC:embedaslibrary• SparkMLContext:Scala,Python,andJava• Tools
• REPL(ScalaSparkandpyspark)• SparkMLpipeline
3
BigDataAnalytics- Characteristics
• Largenumberofmodels• Largenumberofdatapoints• Largenumberoffeatures• Sparsedata• Largenumber/sizeofintermediates• Largenumberofpairs• Customanalytics
4
SystemML– DeclarativeML
• Analyticslanguagefordatascientists(“TheSQLforanalytics”)
• Algorithmsexpressedinadeclarative,high-levellanguageDMLwithR-likesyntax
• Productivityofdatascientists• Enable
• Solutions development• Tools
• Compiler• Cost-basedoptimizertogenerate
executionplansandtoparallelize• basedondatacharacteristics• basedonclusterandmachinecharacteristics
• Physicaloperatorsforin-memorysinglenodeandclusterexecution
• Performance&Scalability
5
High-LevelSystemMLArchitecture
6
Hadoop or Spark Cluster (scale-out)
In-Memory Single Node (scale-up)
Runtime
Compiler
Language
DML Scripts DML (Declarative Machine Learning Language)
ApacheSystemML IncubatorProject
• June,2015:SystemML opensourceannouncedatSparkSummit
• Sep.,2015:publicgithub• Oct.,2015:1st opensourcebinaryrelease(0.8.0)• Nov.,2015:EnterApacheincubation
• http://systemml.apache.org/• https://github.com/apache/incubator-systemml
• Jan.,2016:SystemML 0.9.0(1st Apacherelease)• June,2016:SystemML 0.10.0release
7
ApacheSystemMLIncubatorhttp://systemml.apache.org/
• GetSystemML• Documentation
• DMLReferenceGuide• AlgorithmsGuide• Running
• Community• JIRAserver
• GitHub
8
SampleCode
A = 1.0 # A is an integerX <- matrix(“4 3 2 5 7 8”, rows=3, cols=2) # X = matrix of size 3,2 '<-' is assignmentY = matrix(1, rows=3, cols=2) # Y = matrix of size 3,2 with all 1sb <- t(X) %*% Y # %*% is matrix multiply, t(X) is transposeS = "hello world"
i=0while(i < max_iteration) {
H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) # * is element by element multW = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))i = i + 1; # i is an integer
}
print (toString(H)) # toString converts a matrix to a string
10
SampleCode
source("nn/layers/affine.dml") as affine # import a file in the “affine“ namespace[W, b] = affine::init(D, M) # calls the init function, multiple return
parfor (i in 1:nrow(X)) { # i iterates over 1 through num rows in X in parallelfor (j in 1:ncol(X)) { # j iterates over 1 through num cols in X# Computation ...
}}
write (M, fileM, format=“text”) # M=matrix, fileM=file, also writes toHDFS
X = read (fileX) # fileX=file, also reads from HDFS
if (ncol (A) > 1) {# Matrix A is being sliced by a given range of columnsA[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)];
}
11
SampleCode
interpSpline = function(double x, matrix[double] X, matrix[double] Y, matrix[double] K) return (double q) {i = as.integer(nrow(X) - sum(ppred(X, x, ">=")) + 1)
# misc computation …q = as.scalar(qm)
}
eigen = externalFunction(Matrix[Double] A) return(Matrix[Double] eval, Matrix[Double] evec)
implemented in (classname="org.apache.sysml.udf.lib.EigenWrapper", exectype="mem")
12
SampleCode(FromLinearRegDS.dml*)
A = t(X) %*% Xb = t(X) %*% y
if (intercept_status == 2) {A = t(diag (scale_X) %*% A + shift_X %*% A [m_ext, ]) A = diag (scale_X) %*% A + shift_X %*% A [m_ext, ]b = diag (scale_X) %*% b + shift_X %*% b [m_ext, ]
}
A = A + diag (lambda)
print ("Calling the Direct Solver...")
beta_unscaled = solve (A, b)
*https://github.com/apache/incubator-systemml/blob/master/scripts/algorithms/LinearRegDS.dml#L133
13
DMLEditorSupport• Veryrudimentaryeditorsupport• Bitofshamelessself-promotion :• Atom– HackableTexteditor
• Installpackage- https://atom.io/packages/language-dml• FromGUI- http://flight-manual.atom.io/using-atom/sections/atom-packages/• Orfromcommandline– apminstalllanguage-dml• Rudimentarysnippetbasedcompletionofbuiltinfunction
• Vim• Installpackage- https://github.com/nakul02/vim-dml• WorkswithVundle(vimpackagemanager)
• ThereisanexperimentalZeppelinNotebookintegrationwithDML–• https://issues.apache.org/jira/browse/SYSTEMML-542• Availableasadockerimagetoplaywith- https://hub.docker.com/r/nakul02/incubator-
zeppelin/• Pleasesendfeedbackwhenusing these,requestsforfeatures,bugs
• I’llworkonthemwhenIcan
14
SystemML Algorithms
15
Category Description
Descriptive StatisticsUnivariateBivariateStratified Bivariate
Classification
Logistic Regression (multinomial)Multi-Class SVMNaïve Bayes (multinomial)Decision TreesRandom Forest
Clustering k-Means
Regression
Linear Regression system of equationsCG (conjugate gradient descent)
Generalized Linear Models (GLM)
Distributions: Gaussian, Poisson, Gamma, Inverse Gaussian, Binomial, BernoulliLinks for all distributions: identity, log, sq. root, inverse, 1/μ2
Links for Binomial / Bernoulli: logit, probit, cloglog, cauchit
StepwiseLinearGLM
Dimension Reduction PCA
Matrix Factorization ALSdirect solveCG (conjugate gradient descent)
Survival ModelsKaplan Meier EstimateCox Proportional Hazard Regression
Predict Algorithm-specific scoringTransformation (native) Recoding, dummy coding, binning, scaling, missing value imputation
Documentation: https://apache.github.io/incubator-systemml/algorithms-reference.htmlScripts:/usr/SystemML/systemml-0.10.0-incubating/scripts/algorithms/
Running/InvokingSystemML
• Commandline• Standalone(JavaapplicationinsingleJVM,inbinfolder)
• Spark(spark-submit,inscriptsfolder)
• hadoop commandline
• APIs(MLContext)• Scala,e.g.runfromSparkshell• Python,e.g.runfromPySpark• Java
• In-Memory
16
MLContextAPI– ExampleUsage
val ml = new MLContext(sc)
val X_train = sc.textFile("amazon0601.txt").filter(!_.startsWith("#")).map(_.split("\t") match{case Array(prod1, prod2)=>(prod1.toInt, prod2.toInt,1.0)}).toDF("prod_i", "prod_j", "x_ij").filter("prod_i < 5000 AND prod_j < 5000") // Change to smaller number.cache()
17
MLContext API– ExampleUsage
val pnmf ="""# data & argsX = read($X)rank = as.integer($rank)
# Computation ....
write(negloglik, $negloglikout)write(W, $Wout)write(H, $Hout)"""
18
MLContext API– ExampleUsage
val pnmf ="""# data & argsX = read($X)rank = as.integer($rank)
# Computation ....
write(negloglik, $negloglikout)write(W, $Wout)write(H, $Hout)"""
ml.registerInput("X", X_train)ml.registerOutput("W")ml.registerOutput("H")ml.registerOutput("negloglik")
val outputs = ml.executeScript(pnmf, Map("maxiter" -> "100", "rank" -> "10"))
val negloglik = getScalarDouble(outputs, "negloglik")
19
End-to-endonSpark…inCode
22
import org.apache.spark.sql._ val ctx = new org.apache.spark.sql.SQLContext(sc) val tweets = ctx.jsonFile("hdfs:/twitter/decahose")tweets.registerAsTable("tweetTable")
ctx.sql("SELECT text FROM tweetTable LIMIT 5").collect.foreach(println)ctx.sql("SELECT lang, COUNT(*) AS cnt FROM tweetTable \GROUP BY lang ORDER BY cnt DESC LIMIT 10").collect.foreach(println)
val texts = ctx.sql("SELECT text FROM tweetTable").map(_.head.toString)
def featurize(str: String): Vector = { ... } val vectors = texts.map(featurize).toDF.cache() val mcV = new MatrixCharacteristics(vectors.count, vocabSize, 1000,1000)val V = RDDConvertUtilsExt(sc, vectors, mcV, false, "_1")
val ml = new com.ibm.bi.dml.api.MLContext(sc)ml.registerInput("V", V, mcV)ml.registerOutput("W")ml.registerOutput("H")val args = Array(numTopics, numGNMFIter)val out = ml.execute("GNMF.dml", args)val W = out.getDF("W")val H = out.getDF("H")
def getWords(r: Row): Array[(String, Double)] = { ... }val topics = H.rdd.map(getWords)
Twitter Data
Explore DataIn SQL
Data Set
Training Set
Topic Modeling
SQLML
Get Topics
SystemMLArchitectureLanguage• R- like syntax• Linear algebra, statistical functions, control structures, etc.• User-defined & external function• Parsing
• Statement blocks & statements• Program Analysis, type inference, dead code elimination
High-Level Operator (HOP) Component• Dataflow in DAGs of operations on matrices, frames, and scalars• Choosing from alternative execution plans based on memory and
cost estimates: operator ordering & selection; hybrid plans
Low-Level Operator (LOP) Component• Low-level physical execution plan (LOPDags) over key-value pairs• “Piggybacking”operations into minimal number Map-Reduce jobs
Runtime• Hybrid Runtime
• CP: single machine operations & orchestrate jobs• MR: generic Map-Reduce jobs & operations• SP: Spark Jobs
• Numerically stable operators• Dense / sparse matrix representation• Multi-Level buffer pool (caching) to evict in-memory objects• Dynamic Recompilation for initial unknowns
CommandLine
JMLCSpark
MLContextAPIs
High-Level Operators
Parser/Language
Low-Level Operators
Compiler
RuntimeControl Program
RuntimeProgram
BufferPool
ParFor Optimizer/Runtime
MRInstSpark
InstCPInst
Recompiler
Cost-based optimizations
DFSIOMem/FSIO
GenericMRJobs
MatrixBlock Library(single/multi-threaded)
23
SystemMLCompilationChain
24
CP + b sb _mVar1SPARK mapmm X.MATRIX.DOUBLE _mvar1.MATRIX.DOUBLE
_mVar2.MATRIX.DOUBLE RIGHT false NONECP * y _mVar2 _mVar3
SelectedAlgebraicSimplificationRewrites
25
Name DynamicPattern
RemoveUnnecessaryIndexing X[a:b,c:d] = Y à X = Y iff dims(X)=dims(Y)X = Y[, 1] à X = Y iff ncol(Y)=1
RemoveEmptyMatrixMultiply
X%*%Y à matrix(0,nrow(X),ncol(Y))iff nnz(X)=0|nnz(Y)=0
RemovedUnnecessaryOuterProduct
X*(Y%*%matrix(1,...)) à X*Yiff ncol(Y)=1
SimplifyDiag Aggregates sum(diag(X))àtrace(X) iff ncol(X)=1
SimplifyMatrixMult Diag diag(X)%*%Y à X*Y iff ncol(X)=1&ncol(Y)=1
SimplifyDiag MatrixMult diag(X%*%Y) à rowSums(X*t(Y)) iff ncol(Y)>1
SimplifyDotProductSum sum(X^2) à t(X)%*%X iff ncol(X)=1
Name StaticPattern
RemoveUnnecessaryOperations t(t(X)), X/1, X*1, X-0 à X matrix(1,)/X à 1/Xrand(,min=-1,max=1)*7 à rand(,min=-7,max=7)
Binaryto Unary X+X à 2*X X*X à X^2 X-X*Y à X*(1-Y)
SimplifyDiag Aggregates trace(X%*%Y)àsum(X*t(Y))
ADataScientist– LinearRegression
26
X ≈
Explanatory/Independent Variables
Predicted/Dependant VariableModel
w
w = argminw ||Xw-y||2 + λ||w||2Optimization Problem:
next direction
Iterateuntilconvergence
initialize
stepsize
update w
initialdirection
accuracymeasures
Conjugate Gradient Method:• Start off with the (negative) gradient• For each step
1. Move to the optimal point along the chosen direction;2. Recompute the gradient;3. Project it onto the subspace conjugate* to all prior directions; 4. Use this as the next direction
(* conjugate = orthogonal given A as the metric)
A = XT X + λ
y
SystemML – RunLinReg CGonSpark
27
100M
10,000
100M
1yX
100M
1,000X
100M
100X
100M
10X
100M
1y
100M
1y
100M
1y
8 TB
800 GB
80 GB
8 GB …tMMp
…
Multithreaded Single Node
20 GB Driver on 16c6 x 55 GB Executors
Hybrid Planwith RDD caching
and fused operator
Hybrid Planwith RDD out-of-core and fused
operator
Hybrid Planwith RDD out-of-core and different
operators
…x.persist();
...
X.mapValues(tMMp)
.reduce ()…
Driver
Fused
Executors
…
RDDcache:X
tMMv tMMv
…x.persist();
...
X.mapValues(tMMp).reduce()
...Executors
…
RDDcache:X
tMMv tMMv
Driver
Spilling
…x.persist();
...// 2 MxV mult
// with broadcast,// mapToPair, and
// reduceByKey... Executors
…
RDDcache:X
Mv
tvM
Mv
tvM
Driver
Driver
Cache
LinReg CGforvaryingData
28
8 GB 100M x 10
80 GB 100M x 100
800 GB 100M x 1K
8 TB 100M x 10K
CP+Spark 21 92 2,065 40,395
Spark 76 124 2,159 40,130
CP+MR 24 277 2,613 41,006
10
100
1,000
10,000
100,000
Exe
cuti
on
Tim
e in
sec
s (
log
sca
le)
Data Size
NoteDriverw+h20GB,16c6Executorseach55GB,24cConvergencein3-4itera+onsSystemMLasof10/2015
Single node MTavoids Spark Ctx& distributed ops
3.6 x
Hybrid plan &RDD caching
3x
Out of Core 1.2x
Fully Utilized
Ø Cost-basedoptimization isimportant
Ø Hybridexecution plansbenefitespeciallymedium-sizeddatasets
Ø Aggregatedin-memorydatasetsaresweetspotforSparkesp.foriterativealgorithms
Ø Graceful degradationforout-of-core
ApacheSystemML- Summary• Cost-basedcompilationofmachinelearningalgorithmsgeneratesexecutionplans
• forsingle-nodein-memory,cluster,andhybridexecution• forvaryingdatacharacteristics:
• varyingnumberofobservations(1,000sto10sofbillions)• varyingnumberofvariables(10sto10sofmillions)• denseandsparsedata
• forvaryingclustercharacteristics (memoryconfigurations,degreeofparallelism)• Out-of-the-box,scalablemachinelearningalgorithms
• e.g.descriptivestatistics,regression, clustering,andclassification
• "Roll-your-own"algorithms• Enableprogrammerproductivity(noworryaboutscalability,numeric stability,and
optimizations)• Fastturn-aroundfornewalgorithms
• Higher-level languageshieldsalgorithmdevelopment investmentfromplatformprogression
• Yarnforresourcenegotiationandelasticity• Sparkforin-memory,iterativeprocessing
29
Roadmap
• Algorithms• kNN,word2vec,non-linearSVM,etc.• Deeplearning
• Engine• CompressedLinearAlgebra• CodeGen• ExtensionsforDeepLearning• GPUbackend
• Usability• DMLnotebook• Languageintegration• APIcleanup
30
ResearchPapers• AhmedElgohary,MatthiasBoehm,PeterJ.Haas,FrederickR.Reiss,BertholdReinwald:Compressed
LinearAlgebraforLargeScaleMachineLearning.ConditionalAcceptatVLDB2016
• MatthiasBoehm,MichaelW.Dusenberry,DeronEriksson,AlexandreV.Evfimievski,FarazMakariManshadi,Niketan Pansare,BertholdReinwald,FrederickR.Reiss,Prithviraj Sen,ArvindC.Surve,Shirish Tatikonda.SystemML: DeclarativeMachineLearningonSpark.VLDB2016
• Botong Huang, MatthiasBoehm, Yuanyuan Tian, BertholdReinwald, Shirish Tatikonda, FrederickR.Reiss:ResourceElasticityforLarge-ScaleMachineLearning. SIGMODConference 2015:137-152
• Arash Ashari, Shirish Tatikonda, MatthiasBoehm, BertholdReinwald, KeithCampbell, JohnKeenleyside, P.Sadayappan:Onoptimizingmachine learningworkloadsviakernelfusion. PPOPP 2015:173-182
• SebastianSchelter, JuanSoto, VolkerMarkl, DouglasBurdick, BertholdReinwald, AlexandreV.Evfimievski:Efficientsamplegenerationforscalablemetalearning. ICDE 2015:1191-1202
• MatthiasBoehm, DouglasR.Burdick, AlexandreV.Evfimievski, BertholdReinwald, FrederickR.Reiss, Prithviraj Sen, Shirish Tatikonda, Yuanyuan Tian:SystemML's Optimizer:PlanGenerationforLarge-ScaleMachineLearningPrograms. IEEEDataEng.Bull. 37(3):52-62 (2014)
• MatthiasBoehm, Shirish Tatikonda, BertholdReinwald, Prithviraj Sen, Yuanyuan Tian, DouglasBurdick, Shivakumar Vaithyanathan: HybridParallelizationStrategies forLarge-ScaleMachineLearninginSystemML. PVLDB 7(7): 553-564 (2014)
• PeterD.Kirchner, MatthiasBoehm, BertholdReinwald, Daby M.Sow, MichaelSchmidt, DeepakS.Turaga, AlainBiem:LargeScaleDiscriminativeMetricLearning. IPDPSWorkshops 2014:1656-1663
• Yuanyuan Tian, Shirish Tatikonda, BertholdReinwald:ScalableandNumericallyStableDescriptiveStatistics inSystemML. ICDE 2012:1351-1359
• AmolGhoting, Rajasekar Krishnamurthy, EdwinP.D.Pednault, BertholdReinwald, VikasSindhwani, Shirish Tatikonda, Yuanyuan Tian, Shivakumar Vaithyanathan:SystemML: DeclarativemachinelearningonMapReduce. ICDE 2011:231-242
31
CustomAlgorithm
Optimizer
Resource Elasticity
GPU
Sampling
NumericStability
TaskParallelism
1st paper
on Spark
Compression