![Page 1: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/1.jpg)
UC Berkeley
a Spark in the
clouditerative and interactive cluster computing
Matei Zaharia, Mosharaf Chowdhury,Michael Franklin, Scott Shenker, Ion Stoica
![Page 2: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/2.jpg)
BackgroundMapReduce and Dryad raised level of abstraction in cluster programming by hiding scaling & faults
However, these systems provide a limited programming model: acyclic data flow
Can we design similarly powerful abstractions for a broader class of applications?
![Page 3: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/3.jpg)
Spark GoalsSupport applications with working sets (datasets reused across parallel operations)
»Iterative jobs (common in machine learning)»Interactive data mining
Retain MapReduce’s fault tolerance & scalability
Experiment with programmability»Integrate into Scala programming language»Support interactive use from Scala interpreter
![Page 4: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/4.jpg)
Non-goalsSpark is not a general-purpose programming language
» One-size-fits-all architectures are also do-nothing-well architectures
Spark is not a scheduler, nor a resource manager
Mesos»Generic resource scheduler with support for
heterogeneous frameworks
![Page 5: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/5.jpg)
Programming ModelResilient distributed datasets (RDDs)
»Created from HDFS files or “parallelized” arrays
»Can be transformed with map and filter»Can be cached across parallel operations
Parallel operations on RDDs»Reduce, toArray, foreach
Shared variables»Accumulators (add-only), broadcast variables
![Page 6: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/6.jpg)
Example: Log MiningLoad “error” messages from a log into
memory, then interactively search for various queries
lines = spark.textFile(“hdfs://...”)
errors = lines.filter(_.startsWith(“ERROR”))
messages = errors.map(_.split(‘\t’)(1))
cachedMsgs = messages.cache()Block 1Block 1
Block 2Block 2
Block 3Block 3
Worker
Worker
Worker
Worker
Worker
Worker
Driver
Driver
cachedMsgs.filter(_.contains(“foo”)).count
cachedMsgs.filter(_.contains(“bar”)).count
. . .
tasks
results
Cache 1
Cache 1
Cache 2
Cache 2
Cache 3
Cache 3
Base RDDBase RDDTransformed
RDDTransformed
RDD
Cached RDDCached RDDParallel
operationParallel
operation
![Page 7: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/7.jpg)
RDD RepresentationEach RDD object maintains a lineage that can be used to rebuild slices of it that are lost / fall out of cache
Ex:cachedMsgs = textFile(“log”).filter(_.contains(“error”)) .map(_.split(‘\t’)(1)) .cache()
HdfsRDDpath:
hdfs://…
HdfsRDDpath:
hdfs://…
FilteredRDD
func: contains(...)
FilteredRDD
func: contains(...)
MappedRDD
func: split(…)
MappedRDD
func: split(…)
CachedRDD
CachedRDD
HDFSHDFS Local cacheLocal cache
getIterator(slice)
![Page 8: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/8.jpg)
Example: Logistic RegressionGoal: find best line separating two sets of
points
+
–
++
+
+
+
++ +
– ––
–
–
–
––
+
target
–
random initial line
![Page 9: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/9.jpg)
Logistic Regression Codeval data = spark.textFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { val gradient = data.map(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_ + _) w -= gradient}
println("Final w: " + w)
![Page 10: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/10.jpg)
Logistic Regression Performance
127 s / iteration
first iteration 174 s
further iterations 6 s
![Page 11: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/11.jpg)
Example: Collaborative FilteringPredict movie ratings for a set of users
based on their past ratings of other movies
R =
1 ? ? 4 5 ? 3? ? 3 5 ? ? 35 ? 5 ? ? ? 14 ? ? ? ? 2 ?
Movies
Users
![Page 12: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/12.jpg)
Matrix Factorization ModelModel R as product of user and movie matrices
A and B of dimensions U×K and M×K
R A=
Problem: given subset of R, optimize A and B
BT
![Page 13: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/13.jpg)
Alternating Least SquaresStart with random A and B
Repeat:
1. Fixing B, optimize A to minimize error on scores in R
2. Fixing A, optimize B to minimize error on scores in R
![Page 14: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/14.jpg)
Serial ALSval R = readRatingsMatrix(...)
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = (0 until U).map(i => updateUser(i, B, R)) B = (0 until M).map(i => updateMovie(i, A, R))}
![Page 15: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/15.jpg)
Naïve Spark ALSval R = readRatingsMatrix(...)
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R)) .toArray() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R)) .toArray()}
Problem:
R re-sent to all
nodes in each
parallel operation
Problem:
R re-sent to all
nodes in each
parallel operation
![Page 16: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/16.jpg)
Efficient Spark ALSval R = spark.broadcast(readRatingsMatrix(...))
var A = (0 until U).map(i => Vector.random(K))var B = (0 until M).map(i => Vector.random(K))
for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R.value)) .toArray() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R.value)) .toArray()}
Solution: mark R
as “broadca
st variable”
Solution: mark R
as “broadca
st variable”
![Page 17: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/17.jpg)
How to Implement Broadcast?
36% of iteration spent on broadcas
t
36% of iteration spent on broadcas
t
Just using broadcast variables gives a significant performance boost, but not enough for all apps
Example: ALS broadcasts 100’s of MB / iteration, which quickly bottlenecked our initial HDFS-based broadcast
![Page 18: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/18.jpg)
Broadcast Methods ExploredMethod Results
NFS Server becomes bottleneck
HDFS Scales further than NFS, but limited
Chained Streaming
Initial results promising, but straggler nodes cause problems
BitTorrent Off-the-shelf BT adds too much overhead in data center environment
SplitStream Scales well in theory, but needs to be modified for fault tolerance
![Page 19: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/19.jpg)
Broadcast Results
![Page 20: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/20.jpg)
ALS Performance with Chained Streaming Broadcast
![Page 21: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/21.jpg)
Language IntegrationScala closures are serializable objects
»Serialize on driver, load, & run on workers
Not quite enough»Nested closures may reference entire outer
scope»May pull in non-serializable variables not
used inside»Solution: bytecode analysis + reflection
![Page 22: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/22.jpg)
Interactive SparkModified Scala interpreter to allow Spark to be used interactively from the command line
Required two changes:»Modified wrapper code generation so that each
“line” typed has references to objects for its dependencies
»Place generated classes in distributed filesystem
Enables in-memory exploration of big data
![Page 23: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/23.jpg)
Demo
![Page 24: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/24.jpg)
ConclusionsSpark provides a limited but efficient set of fault tolerant distributed memory abstractions
»Resilient distributed datasets (RDDs)»Restricted shared variables
Planned extensions:»More RDD transformations (e.g., shuffle)»More RDD persistence options (e.g., disk +
memory)»Updatable RDDs (for incremental or streaming
jobs)»Data sharing across applications
![Page 25: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/25.jpg)
Related WorkDryadLINQ
» Build queries through language-integrated SQL operations on lazy datasets
» Cannot have a dataset persist across queries» No concept of shared variables for broadcast etc.
Pig and Hive» Query languages that can call into Java/Python/etc UDFs» No support for caching a datasets across queries
OpenMP» Compiler extension for parallel loops in C++» Annotate variables as read-only or accumulator above loop» Cluster version exists, but not fault-tolerant
Twister and Haloop» Iterative MapReduce implementations using caching» Can’t define multiple distributed datasets, run multiple map &
reduce pairs on them, or decide which operations to run next interactively
![Page 26: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/26.jpg)
Questions
???
![Page 27: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/27.jpg)
Backup
![Page 28: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/28.jpg)
ArchitectureDriver program connects to Mesos and schedules tasks
Workers run tasks, report results and variable updates
Data shared with HDFS/NFS
No communication between workers for now
Driver
Workers
HDFS
user code
tasks,result
s
Mesos
local cache
![Page 29: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/29.jpg)
Mesos slave
Mesos master
Hadoop v20
scheduler
Mesos slave
Hadoop job
Hadoop v20
executortask
Mesos slaveHadoop
v19 executor
task
MPIscheduler
MPI job
MPIexecu
tortask
Mesos Architecture
Hadoop v19
scheduler
Hadoop job
Hadoop v19
executor
task
MPIexecu
tortask
![Page 30: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/30.jpg)
Serial Versionval data = readData(...)
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = Vector.zeros(D) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient}
println("Final w: " + w)
![Page 31: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/31.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value}
println("Final w: " + w)
![Page 32: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/32.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value}
println("Final w: " + w)
![Page 33: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/33.jpg)
Spark Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) data.foreach(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x }) w -= gradient.value}
println("Final w: " + w)
![Page 34: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/34.jpg)
Functional Programming Versionval data = spark.hdfsTextFile(...).map(readPoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) { w -= data.map(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_+_)}
println("Final w: " + w)
![Page 35: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/35.jpg)
Job Execution
Big DatasetBig Dataset
Slave 4
Slave 4
Slave 3
Slave 3
Slave 2
Slave 2
Slave 1
Slave 1
MasterMaster
R1 R2 R3 R4
aggregate
update paramupdate param
param
Spark
![Page 36: UC Berkeley a Spark in the cloud iterative and interactive cluster computing Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica](https://reader036.vdocuments.us/reader036/viewer/2022062423/56649c785503460f9492d2fd/html5/thumbnails/36.jpg)
Job Execution
Slave 4
Slave 4
Slave 3
Slave 3
Slave 2
Slave 2
Slave 1
Slave 1
MasterMaster
R1 R2 R3 R4
aggregate
update paramupdate param
param
MasterMaster
aggregate
param
Map 4Map 4Map 3Map 3Map 2Map 2Map 1Map 1
ReduceReduce
aggregate
Map 8Map 8Map 7Map 7Map 6Map 6Map 5Map 5
ReduceReduce
param
Spark Hadoop / Dryad