![Page 1: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/1.jpg)
UC Berkeley
SparkA framework for iterative and interactive cluster computingMatei Zaharia, Mosharaf Chowdhury,Michael Franklin, Scott Shenker, Ion Stoica
![Page 2: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/2.jpg)
OutlineBackground: Nexus project
Spark goals
Programming model
Example jobs
Implementation
Interactive Spark
![Page 3: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/3.jpg)
Nexus BackgroundRapid innovation in cluster computing frameworks
Dryad
ApacheHama
Pregel
Pig
![Page 4: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/4.jpg)
ProblemRapid innovation in cluster computing frameworks
No single framework optimal for all applications
Want to run multiple frameworks in a single cluster» …to maximize utilization» …to share data between frameworks» …to isolate workloads
![Page 5: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/5.jpg)
SolutionNexus is an “operating system” for the cluster over which diverse frameworks can run
»Nexus multiplexes resources between frameworks
»Frameworks control job execution
![Page 6: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/6.jpg)
Nexus slave
Nexus master
Hadoop v20
scheduler
Nexus slave
Hadoop job
Hadoop v20
executortask
Nexus slaveHadoop
v19 executor
task
MPIscheduler
MPI job
MPIexecu
tortask
Nexus Architecture
Hadoop v19
scheduler
Hadoop job
Hadoop v19
executor
task
MPIexecu
tortask
![Page 7: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/7.jpg)
Nexus StatusPrototype in 7000 lines of C++
Ported frameworks:» Hadoop (900 line patch)» MPI (160 line wrapper scripts)
New frameworks:» Spark, Scala framework for iterative jobs & more» Apache+haproxy, elastic web server farm (200 lines)
![Page 8: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/8.jpg)
OutlineBackground: Nexus project
Spark goals
Programming model
Example job
Implementation
Interactive Spark
![Page 9: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/9.jpg)
Spark GoalsSupport iterative jobs
»Machine learning researchers in our lab identified this as a workload that Hadoop doesn’t perform well on
Experiment with programmability»Leverage Scala to integrate cleanly into
programs»Support interactive use from Scala
interpreter
Retain MapReduce’s fine-grained fault-tolerance
![Page 10: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/10.jpg)
Programming ModelDistributed datasets
»HDFS files, “parallelized” Scala collections»Can be transformed with map and filter»Can be cached across parallel operations
Parallel operations»Foreach, reduce, collect
Shared variables»Accumulators (add-only)»Broadcast variables (read-only)
![Page 11: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/11.jpg)
Example 1:Logistic Regression
![Page 12: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/12.jpg)
Logistic RegressionGoal: find best line separating two sets
of points
+
–
++
+
+
+
++ +
– ––
–
–
–
––
+
target
–
random initial line
![Page 13: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/13.jpg)
Serial Versionval data = readData(...)
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = Vector.zeros(D) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient}
println("Final w: " + w)
![Page 14: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/14.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value}
println("Final w: " + w)
![Page 15: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/15.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value}
println("Final w: " + w)
![Page 16: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/16.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) data.foreach(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x }) w -= gradient.value}
println("Final w: " + w)
![Page 17: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/17.jpg)
Functional Programming Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { w -= data.map(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_+_)}
println("Final w: " + w)
![Page 18: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/18.jpg)
Job Execution
Big Dataset
Slave 4
Slave 3
Slave 2
Slave 1
Master
R1 R2 R3 R4
aggregate
update param
param
Spark
![Page 19: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/19.jpg)
Job Execution
Slave 4
Slave 3
Slave 2
Slave 1
Master
R1 R2 R3 R4
aggregate
update param
param
Master
aggregate
param
Map 4Map 3Map 2Map 1
Reduce
aggregate
Map 8Map 7Map 6Map 5
Reduce
param
Spark Hadoop / Dryad
![Page 20: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/20.jpg)
Performance
1 5 10 20 300
50010001500200025003000350040004500
Hadoop
Number of Iterations
Ru
nn
ing
Tim
e (
s) 127 s / iteration
first iteration 174 s
further iterations 6 s
![Page 21: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/21.jpg)
Example 2:Alternating Least Squares
![Page 22: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/22.jpg)
Collaborative FilteringPredict movie ratings for a set of users based on their past ratings
R =
1 ? ? 45 ? 3
? ? 3 5? ? 3
5 ? 5 ?? ? 1
4 ? ? ?? 2 ?
Movies
Users
![Page 23: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/23.jpg)
Matrix FactorizationModel R as product of user and movie matrices A and B of dimensions U×K and M×K
R A=
Problem: given subset of R, optimize A and B
BT
![Page 24: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/24.jpg)
Alternating Least Squares Algorithm
Start with random A and B
Repeat:
1.Fixing B, optimize A to minimize error on scores in R
2.Fixing A, optimize B to minimize error on scores in R
![Page 25: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/25.jpg)
Serial ALSval R = readRatingsMatrix(...)
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = (0 until U).map(i => updateUser(i, B, R)) B = (0 until M).map(i => updateMovie(i, A, R))}
![Page 26: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/26.jpg)
Naïve Spark ALSval R = readRatingsMatrix(...)
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R)) .collect() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R)) .collect()}
Problem:
R re-sent to all
nodes in each
parallel operatio
n
![Page 27: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/27.jpg)
Efficient Spark ALSval R = spark.broadcast(readRatingsMatrix(...))
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R.value)) .collect() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R.value)) .collect()}
Solution: mark R
as broadcas
t variable
![Page 28: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/28.jpg)
ALS Performance
4 cores (1 node)
12 cores
(2 nodes)
20 cores
(3 nodes)
28 cores
(4 nodes)
36 cores
(5 nodes)
60 cores
(8 nodes)
0
50
100
150
200
250
300
First Itera-tionSubsequent Iterations
Cluster Configuration
Itera
tion
Du
rati
on
(s)
![Page 29: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/29.jpg)
Subseq. Iteration Breakdown
4 cores
(1 node)
12 cores
(2 nodes)
20 cores
(3 nodes)
28 cores
(4 nodes)
36 cores
(5 nodes)
60 cores
(8 nodes)
0
50
100
150
200
250
300
Computa-tionBroadcast
Tim
e w
ith
in I
tera
tion
(s)
36% of iteration spent on broadcas
t
![Page 30: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/30.jpg)
OutlineBackground: Nexus project
Spark goals
Programming model
Example job
Implementation
Interactive Spark
![Page 31: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/31.jpg)
ArchitectureDriver program connects to Nexus and schedules tasks
Workers run tasks, report results and variable updates
Data shared with HDFS/NFS
No communication between workers for now
Driver
Workers
HDFS
user code, broadcast
vars
tasks,result
s
Nexus
local cache
![Page 32: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/32.jpg)
Distributed DatasetsEach distributed dataset object maintains a lineage that is used to rebuild slices that are lost / fall out of cache
Ex:errors = textFile(“log”).filter(_.contains(“error”)) .map(_.split(‘\t’)(1)) .cache()
HdfsFilepath:
hdfs://…
FilteredFilefunc:
contains(...)
MappedFile
func: split(…)
CachedFile
HDFSLocal cach
e
getIterator(slice)
![Page 33: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/33.jpg)
Language IntegrationScala closures are Serializable objects
»Serialize on driver, load & run on workers
Not quite enough»Nested closures may reference entire outer scope»May pull in non-Serializable variables not used
inside»Solution: bytecode analysis + reflection
Shared variables»Accumulators: serialized form contains ID»Broadcast vars: serialized form is path to HDFS file
![Page 34: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/34.jpg)
Interactive SparkModified Scala interpreter to allow Spark to be used interactively from the command line
Required two changes:»Modified wrapper code generation so that
each “line” typed has references to objects for its dependencies
»Place generated classes in distributed filesystem
Enables in-memory exploration of big data
![Page 35: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/35.jpg)
Demo
![Page 36: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/36.jpg)
ConclusionsSpark provides two abstractions that enable iterative jobs and interactive use:
1. Distributed datasets with controllable persistence, supporting fault-tolerant parallel operations
2. Shared variables for efficient broadcast and imperative style programming
Language integration achieved using Scala features + some amount of hacking
All this is surprisingly little code (~1600 lines)
![Page 37: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/37.jpg)
Related WorkDryadLINQ
» SQL-like queries integrated in C# programs» Build queries through operations on lazy datasets» Cannot have a dataset persist across queries» No concept of shared variables for broadcast etc
Pig & Hive» Query languages that can call into Java/Python/etc UDFs» No support for caching a dataset across queries
OpenMP» Compiler extension for parallel loops in C++» Annotate variables as read-only or accumulator above loop» Cluster version exists, but not fault-tolerant
![Page 38: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/38.jpg)
Future WorkOpen-source Spark and Nexus
»Probably this summer»Very interested in getting users!
Understand which classes of algorithms we can handle and how to extend Spark for others
Build higher-level interfaces on top of interactive Spark (e.g. R, SQL)
![Page 39: UC Berkeley Spark A framework for iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader035.vdocuments.us/reader035/viewer/2022062518/56649e2d5503460f94b1ce22/html5/thumbnails/39.jpg)
Questions
???