overview of apache systemml by berthold reinwald and nakul jindal

32
Scalable Machine Learning with Apache SystemML Berthold Reinwald,Nakul Jindal IBM June 21st, 2016 1

Upload: arvind-surve

Post on 19-Jan-2017

35 views

Category:

Education


0 download

TRANSCRIPT

ScalableMachineLearningwithApacheSystemML

BertholdReinwald,NakulJindalIBM

June21st,2016

1

Agenda

• WhatisApacheSystemML• HowtoimplementSystemML algorithmsè datascientist

• HowtorunSystemML algorithmsè user

• HowdoesSystemMLworkè SystemML developer

2

WhatisApacheSystemML

• Inanutshell• alanguagefordatascientiststoimplementscalableMLalgorithms

• 2languagevariants:R-likeandPython-likesyntax• Strongfoundationoflinearalgebraoperationsandstatisticalfunctions• Comeswithapprox.20+algorithmspre-implemented

• Cost-basedoptimizertocompileexecutionplans• Dependingondatacharacteristics(tall/skinny,short/wide;dense/sparse)

andclustercharacteristics• rangingfromsinglenodetoclusters(MapReduce,Spark);hybridplans

• APIs&Tools• Commandline:hadoop jar,spark-submit,standaloneJavaapp• JMLC:embedaslibrary• SparkMLContext:Scala,Python,andJava• Tools

• REPL(ScalaSparkandpyspark)• SparkMLpipeline

3

BigDataAnalytics- Characteristics

• Largenumberofmodels• Largenumberofdatapoints• Largenumberoffeatures• Sparsedata• Largenumber/sizeofintermediates• Largenumberofpairs• Customanalytics

4

SystemML– DeclarativeML

• Analyticslanguagefordatascientists(“TheSQLforanalytics”)

• Algorithmsexpressedinadeclarative,high-levellanguageDMLwithR-likesyntax

• Productivityofdatascientists• Enable

• Solutions development• Tools

• Compiler• Cost-basedoptimizertogenerate

executionplansandtoparallelize• basedondatacharacteristics• basedonclusterandmachinecharacteristics

• Physicaloperatorsforin-memorysinglenodeandclusterexecution

• Performance&Scalability

5

High-LevelSystemMLArchitecture

6

Hadoop or Spark Cluster (scale-out)

In-Memory Single Node (scale-up)

Runtime

Compiler

Language

DML Scripts DML (Declarative Machine Learning Language)

ApacheSystemML IncubatorProject

• June,2015:SystemML opensourceannouncedatSparkSummit

• Sep.,2015:publicgithub• Oct.,2015:1st opensourcebinaryrelease(0.8.0)• Nov.,2015:EnterApacheincubation

• http://systemml.apache.org/• https://github.com/apache/incubator-systemml

• Jan.,2016:SystemML 0.9.0(1st Apacherelease)• June,2016:SystemML 0.10.0release

7

ApacheSystemMLIncubatorhttp://systemml.apache.org/

• GetSystemML• Documentation

• DMLReferenceGuide• AlgorithmsGuide• Running

• Community• JIRAserver

• GitHub

8

DMLLanguageReferenceGuide

9

https://apache.github.io/incubator-systemml/dml-language-reference.html

SampleCode

A = 1.0 # A is an integerX <- matrix(“4 3 2 5 7 8”, rows=3, cols=2) # X = matrix of size 3,2 '<-' is assignmentY = matrix(1, rows=3, cols=2) # Y = matrix of size 3,2 with all 1sb <- t(X) %*% Y # %*% is matrix multiply, t(X) is transposeS = "hello world"

i=0while(i < max_iteration) {

H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) # * is element by element multW = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))i = i + 1; # i is an integer

}

print (toString(H)) # toString converts a matrix to a string

10

SampleCode

source("nn/layers/affine.dml") as affine # import a file in the “affine“ namespace[W, b] = affine::init(D, M) # calls the init function, multiple return

parfor (i in 1:nrow(X)) { # i iterates over 1 through num rows in X in parallelfor (j in 1:ncol(X)) { # j iterates over 1 through num cols in X# Computation ...

}}

write (M, fileM, format=“text”) # M=matrix, fileM=file, also writes toHDFS

X = read (fileX) # fileX=file, also reads from HDFS

if (ncol (A) > 1) {# Matrix A is being sliced by a given range of columnsA[,1:(ncol (A) - 1)] = A[,1:(ncol (A) - 1)] - A[,2:ncol (A)];

}

11

SampleCode

interpSpline = function(double x, matrix[double] X, matrix[double] Y, matrix[double] K) return (double q) {i = as.integer(nrow(X) - sum(ppred(X, x, ">=")) + 1)

# misc computation …q = as.scalar(qm)

}

eigen = externalFunction(Matrix[Double] A) return(Matrix[Double] eval, Matrix[Double] evec)

implemented in (classname="org.apache.sysml.udf.lib.EigenWrapper", exectype="mem")

12

SampleCode(FromLinearRegDS.dml*)

A = t(X) %*% Xb = t(X) %*% y

if (intercept_status == 2) {A = t(diag (scale_X) %*% A + shift_X %*% A [m_ext, ]) A = diag (scale_X) %*% A + shift_X %*% A [m_ext, ]b = diag (scale_X) %*% b + shift_X %*% b [m_ext, ]

}

A = A + diag (lambda)

print ("Calling the Direct Solver...")

beta_unscaled = solve (A, b)

*https://github.com/apache/incubator-systemml/blob/master/scripts/algorithms/LinearRegDS.dml#L133

13

DMLEditorSupport• Veryrudimentaryeditorsupport• Bitofshamelessself-promotion :• Atom– HackableTexteditor

• Installpackage- https://atom.io/packages/language-dml• FromGUI- http://flight-manual.atom.io/using-atom/sections/atom-packages/• Orfromcommandline– apminstalllanguage-dml• Rudimentarysnippetbasedcompletionofbuiltinfunction

• Vim• Installpackage- https://github.com/nakul02/vim-dml• WorkswithVundle(vimpackagemanager)

• ThereisanexperimentalZeppelinNotebookintegrationwithDML–• https://issues.apache.org/jira/browse/SYSTEMML-542• Availableasadockerimagetoplaywith- https://hub.docker.com/r/nakul02/incubator-

zeppelin/• Pleasesendfeedbackwhenusing these,requestsforfeatures,bugs

• I’llworkonthemwhenIcan

14

SystemML Algorithms

15

Category Description

Descriptive StatisticsUnivariateBivariateStratified Bivariate

Classification

Logistic Regression (multinomial)Multi-Class SVMNaïve Bayes (multinomial)Decision TreesRandom Forest

Clustering k-Means

Regression

Linear Regression system of equationsCG (conjugate gradient descent)

Generalized Linear Models (GLM)

Distributions: Gaussian, Poisson, Gamma, Inverse Gaussian, Binomial, BernoulliLinks for all distributions: identity, log, sq. root, inverse, 1/μ2

Links for Binomial / Bernoulli: logit, probit, cloglog, cauchit

StepwiseLinearGLM

Dimension Reduction PCA

Matrix Factorization ALSdirect solveCG (conjugate gradient descent)

Survival ModelsKaplan Meier EstimateCox Proportional Hazard Regression

Predict Algorithm-specific scoringTransformation (native) Recoding, dummy coding, binning, scaling, missing value imputation

Documentation: https://apache.github.io/incubator-systemml/algorithms-reference.htmlScripts:/usr/SystemML/systemml-0.10.0-incubating/scripts/algorithms/

Running/InvokingSystemML

• Commandline• Standalone(JavaapplicationinsingleJVM,inbinfolder)

• Spark(spark-submit,inscriptsfolder)

• hadoop commandline

• APIs(MLContext)• Scala,e.g.runfromSparkshell• Python,e.g.runfromPySpark• Java

• In-Memory

16

MLContextAPI– ExampleUsage

val ml = new MLContext(sc)

val X_train = sc.textFile("amazon0601.txt").filter(!_.startsWith("#")).map(_.split("\t") match{case Array(prod1, prod2)=>(prod1.toInt, prod2.toInt,1.0)}).toDF("prod_i", "prod_j", "x_ij").filter("prod_i < 5000 AND prod_j < 5000") // Change to smaller number.cache()

17

MLContext API– ExampleUsage

val pnmf ="""# data & argsX = read($X)rank = as.integer($rank)

# Computation ....

write(negloglik, $negloglikout)write(W, $Wout)write(H, $Hout)"""

18

MLContext API– ExampleUsage

val pnmf ="""# data & argsX = read($X)rank = as.integer($rank)

# Computation ....

write(negloglik, $negloglikout)write(W, $Wout)write(H, $Hout)"""

ml.registerInput("X", X_train)ml.registerOutput("W")ml.registerOutput("H")ml.registerOutput("negloglik")

val outputs = ml.executeScript(pnmf, Map("maxiter" -> "100", "rank" -> "10"))

val negloglik = getScalarDouble(outputs, "negloglik")

19

RunLinRegCGfromSparkShell(MLContext)

20

RunSystemMLinMLPipeline

21

End-to-endonSpark…inCode

22

import org.apache.spark.sql._ val ctx = new org.apache.spark.sql.SQLContext(sc) val tweets = ctx.jsonFile("hdfs:/twitter/decahose")tweets.registerAsTable("tweetTable")

ctx.sql("SELECT text FROM tweetTable LIMIT 5").collect.foreach(println)ctx.sql("SELECT lang, COUNT(*) AS cnt FROM tweetTable \GROUP BY lang ORDER BY cnt DESC LIMIT 10").collect.foreach(println)

val texts = ctx.sql("SELECT text FROM tweetTable").map(_.head.toString)

def featurize(str: String): Vector = { ... } val vectors = texts.map(featurize).toDF.cache() val mcV = new MatrixCharacteristics(vectors.count, vocabSize, 1000,1000)val V = RDDConvertUtilsExt(sc, vectors, mcV, false, "_1")

val ml = new com.ibm.bi.dml.api.MLContext(sc)ml.registerInput("V", V, mcV)ml.registerOutput("W")ml.registerOutput("H")val args = Array(numTopics, numGNMFIter)val out = ml.execute("GNMF.dml", args)val W = out.getDF("W")val H = out.getDF("H")

def getWords(r: Row): Array[(String, Double)] = { ... }val topics = H.rdd.map(getWords)

Twitter Data

Explore DataIn SQL

Data Set

Training Set

Topic Modeling

SQLML

Get Topics

SystemMLArchitectureLanguage• R- like syntax• Linear algebra, statistical functions, control structures, etc.• User-defined & external function• Parsing

• Statement blocks & statements• Program Analysis, type inference, dead code elimination

High-Level Operator (HOP) Component• Dataflow in DAGs of operations on matrices, frames, and scalars• Choosing from alternative execution plans based on memory and

cost estimates: operator ordering & selection; hybrid plans

Low-Level Operator (LOP) Component• Low-level physical execution plan (LOPDags) over key-value pairs• “Piggybacking”operations into minimal number Map-Reduce jobs

Runtime• Hybrid Runtime

• CP: single machine operations & orchestrate jobs• MR: generic Map-Reduce jobs & operations• SP: Spark Jobs

• Numerically stable operators• Dense / sparse matrix representation• Multi-Level buffer pool (caching) to evict in-memory objects• Dynamic Recompilation for initial unknowns

CommandLine

JMLCSpark

MLContextAPIs

High-Level Operators

Parser/Language

Low-Level Operators

Compiler

RuntimeControl Program

RuntimeProgram

BufferPool

ParFor Optimizer/Runtime

MRInstSpark

InstCPInst

Recompiler

Cost-based optimizations

DFSIOMem/FSIO

GenericMRJobs

MatrixBlock Library(single/multi-threaded)

23

SystemMLCompilationChain

24

CP + b sb _mVar1SPARK mapmm X.MATRIX.DOUBLE _mvar1.MATRIX.DOUBLE

_mVar2.MATRIX.DOUBLE RIGHT false NONECP * y _mVar2 _mVar3

SelectedAlgebraicSimplificationRewrites

25

Name DynamicPattern

RemoveUnnecessaryIndexing X[a:b,c:d] = Y à X = Y iff dims(X)=dims(Y)X = Y[, 1] à X = Y iff ncol(Y)=1

RemoveEmptyMatrixMultiply

X%*%Y à matrix(0,nrow(X),ncol(Y))iff nnz(X)=0|nnz(Y)=0

RemovedUnnecessaryOuterProduct

X*(Y%*%matrix(1,...)) à X*Yiff ncol(Y)=1

SimplifyDiag Aggregates sum(diag(X))àtrace(X) iff ncol(X)=1

SimplifyMatrixMult Diag diag(X)%*%Y à X*Y iff ncol(X)=1&ncol(Y)=1

SimplifyDiag MatrixMult diag(X%*%Y) à rowSums(X*t(Y)) iff ncol(Y)>1

SimplifyDotProductSum sum(X^2) à t(X)%*%X iff ncol(X)=1

Name StaticPattern

RemoveUnnecessaryOperations t(t(X)), X/1, X*1, X-0 à X matrix(1,)/X à 1/Xrand(,min=-1,max=1)*7 à rand(,min=-7,max=7)

Binaryto Unary X+X à 2*X X*X à X^2 X-X*Y à X*(1-Y)

SimplifyDiag Aggregates trace(X%*%Y)àsum(X*t(Y))

ADataScientist– LinearRegression

26

X ≈

Explanatory/Independent Variables

Predicted/Dependant VariableModel

w

w = argminw ||Xw-y||2 + λ||w||2Optimization Problem:

next direction

Iterateuntilconvergence

initialize

stepsize

update w

initialdirection

accuracymeasures

Conjugate Gradient Method:• Start off with the (negative) gradient• For each step

1. Move to the optimal point along the chosen direction;2. Recompute the gradient;3. Project it onto the subspace conjugate* to all prior directions; 4. Use this as the next direction

(* conjugate = orthogonal given A as the metric)

A = XT X + λ

y

SystemML – RunLinReg CGonSpark

27

100M

10,000

100M

1yX

100M

1,000X

100M

100X

100M

10X

100M

1y

100M

1y

100M

1y

8 TB

800 GB

80 GB

8 GB …tMMp

Multithreaded Single Node

20 GB Driver on 16c6 x 55 GB Executors

Hybrid Planwith RDD caching

and fused operator

Hybrid Planwith RDD out-of-core and fused

operator

Hybrid Planwith RDD out-of-core and different

operators

…x.persist();

...

X.mapValues(tMMp)

.reduce ()…

Driver

Fused

Executors

RDDcache:X

tMMv tMMv

…x.persist();

...

X.mapValues(tMMp).reduce()

...Executors

RDDcache:X

tMMv tMMv

Driver

Spilling

…x.persist();

...// 2 MxV mult

// with broadcast,// mapToPair, and

// reduceByKey... Executors

RDDcache:X

Mv

tvM

Mv

tvM

Driver

Driver

Cache

LinReg CGforvaryingData

28

8 GB 100M x 10

80 GB 100M x 100

800 GB 100M x 1K

8 TB 100M x 10K

CP+Spark 21 92 2,065 40,395

Spark 76 124 2,159 40,130

CP+MR 24 277 2,613 41,006

10

100

1,000

10,000

100,000

Exe

cuti

on

Tim

e in

sec

s (

log

sca

le)

Data Size

NoteDriverw+h20GB,16c6Executorseach55GB,24cConvergencein3-4itera+onsSystemMLasof10/2015

Single node MTavoids Spark Ctx& distributed ops

3.6 x

Hybrid plan &RDD caching

3x

Out of Core 1.2x

Fully Utilized

Ø Cost-basedoptimization isimportant

Ø Hybridexecution plansbenefitespeciallymedium-sizeddatasets

Ø Aggregatedin-memorydatasetsaresweetspotforSparkesp.foriterativealgorithms

Ø Graceful degradationforout-of-core

ApacheSystemML- Summary• Cost-basedcompilationofmachinelearningalgorithmsgeneratesexecutionplans

• forsingle-nodein-memory,cluster,andhybridexecution• forvaryingdatacharacteristics:

• varyingnumberofobservations(1,000sto10sofbillions)• varyingnumberofvariables(10sto10sofmillions)• denseandsparsedata

• forvaryingclustercharacteristics (memoryconfigurations,degreeofparallelism)• Out-of-the-box,scalablemachinelearningalgorithms

• e.g.descriptivestatistics,regression, clustering,andclassification

• "Roll-your-own"algorithms• Enableprogrammerproductivity(noworryaboutscalability,numeric stability,and

optimizations)• Fastturn-aroundfornewalgorithms

• Higher-level languageshieldsalgorithmdevelopment investmentfromplatformprogression

• Yarnforresourcenegotiationandelasticity• Sparkforin-memory,iterativeprocessing

29

Roadmap

• Algorithms• kNN,word2vec,non-linearSVM,etc.• Deeplearning

• Engine• CompressedLinearAlgebra• CodeGen• ExtensionsforDeepLearning• GPUbackend

• Usability• DMLnotebook• Languageintegration• APIcleanup

30

ResearchPapers• AhmedElgohary,MatthiasBoehm,PeterJ.Haas,FrederickR.Reiss,BertholdReinwald:Compressed

LinearAlgebraforLargeScaleMachineLearning.ConditionalAcceptatVLDB2016

• MatthiasBoehm,MichaelW.Dusenberry,DeronEriksson,AlexandreV.Evfimievski,FarazMakariManshadi,Niketan Pansare,BertholdReinwald,FrederickR.Reiss,Prithviraj Sen,ArvindC.Surve,Shirish Tatikonda.SystemML: DeclarativeMachineLearningonSpark.VLDB2016

• Botong Huang, MatthiasBoehm, Yuanyuan Tian, BertholdReinwald, Shirish Tatikonda, FrederickR.Reiss:ResourceElasticityforLarge-ScaleMachineLearning. SIGMODConference 2015:137-152

• Arash Ashari, Shirish Tatikonda, MatthiasBoehm, BertholdReinwald, KeithCampbell, JohnKeenleyside, P.Sadayappan:Onoptimizingmachine learningworkloadsviakernelfusion. PPOPP 2015:173-182

• SebastianSchelter, JuanSoto, VolkerMarkl, DouglasBurdick, BertholdReinwald, AlexandreV.Evfimievski:Efficientsamplegenerationforscalablemetalearning. ICDE 2015:1191-1202

• MatthiasBoehm, DouglasR.Burdick, AlexandreV.Evfimievski, BertholdReinwald, FrederickR.Reiss, Prithviraj Sen, Shirish Tatikonda, Yuanyuan Tian:SystemML's Optimizer:PlanGenerationforLarge-ScaleMachineLearningPrograms. IEEEDataEng.Bull. 37(3):52-62 (2014)

• MatthiasBoehm, Shirish Tatikonda, BertholdReinwald, Prithviraj Sen, Yuanyuan Tian, DouglasBurdick, Shivakumar Vaithyanathan: HybridParallelizationStrategies forLarge-ScaleMachineLearninginSystemML. PVLDB 7(7): 553-564 (2014)

• PeterD.Kirchner, MatthiasBoehm, BertholdReinwald, Daby M.Sow, MichaelSchmidt, DeepakS.Turaga, AlainBiem:LargeScaleDiscriminativeMetricLearning. IPDPSWorkshops 2014:1656-1663

• Yuanyuan Tian, Shirish Tatikonda, BertholdReinwald:ScalableandNumericallyStableDescriptiveStatistics inSystemML. ICDE 2012:1351-1359

• AmolGhoting, Rajasekar Krishnamurthy, EdwinP.D.Pednault, BertholdReinwald, VikasSindhwani, Shirish Tatikonda, Yuanyuan Tian, Shivakumar Vaithyanathan:SystemML: DeclarativemachinelearningonMapReduce. ICDE 2011:231-242

31

CustomAlgorithm

Optimizer

Resource Elasticity

GPU

Sampling

NumericStability

TaskParallelism

1st paper

on Spark

Compression

32

Thank You