spark - mosharaf chowdhury · spark goals support applications with working sets (datasets reused...
TRANSCRIPT
![Page 1: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/1.jpg)
UC Berkeley
a Spark in the cloud
iterative and interactive cluster computing
Matei Zaharia, Mosharaf Chowdhury, Michael Franklin, Scott Shenker, Ion Stoica
![Page 2: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/2.jpg)
Background MapReduce and Dryad raised level of abstraction in cluster programming by hiding scaling & faults
However, these systems provide a limited programming model: acyclic data flow
Can we design similarly powerful abstractions for a broader class of applications?
![Page 3: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/3.jpg)
Spark Goals Support applications with working sets (datasets reused across parallel operations)
» Iterative jobs (common in machine learning) » Interactive data mining
Retain MapReduce’s fault tolerance & scalability
Experiment with programmability » Integrate into Scala programming language »Support interactive use from Scala interpreter
![Page 4: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/4.jpg)
Non-goals Spark is not a general-purpose programming language
» One-size-fits-all architectures are also do-nothing-well architectures
Spark is not a scheduler, nor a resource manager
Mesos »Generic resource scheduler with support for
heterogeneous frameworks
![Page 5: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/5.jpg)
Programming Model
Resilient distributed datasets (RDDs) »Created from HDFS files or “parallelized” arrays »Can be transformed with map and filter »Can be cached across parallel operations
Parallel operations on RDDs »Reduce, toArray, foreach
Shared variables »Accumulators (add-only), broadcast variables
![Page 6: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/6.jpg)
Example: Log Mining Load “error” messages from a log into memory, then interactively search for various queries
lines = spark.textFile(“hdfs://...”)
errors = lines.filter(_.startsWith(“ERROR”))
messages = errors.map(_.split(„\t‟)(1))
cachedMsgs = messages.cache()
Block 1
Block 2
Block 3
Worker
Worker
Worker
Driver
cachedMsgs.filter(_.contains(“foo”)).count
cachedMsgs.filter(_.contains(“bar”)).count
. . .
tasks
results
Cache 1
Cache 2
Cache 3
Base RDD Transformed RDD
Cached RDD Parallel operation
![Page 7: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/7.jpg)
RDD Representation Each RDD object maintains a lineage that can be used to rebuild slices of it that are lost / fall out of cache
Ex:
cachedMsgs = textFile(“log”).filter(_.contains(“error”)) .map(_.split(„\t‟)(1)) .cache()
HdfsRDD path: hdfs://…
FilteredRDD func: contains(...)
MappedRDD func: split(…)
CachedRDD
HDFS Local cache
getIterator(slice)
![Page 8: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/8.jpg)
Example: Logistic Regression
Goal: find best line separating two sets of points
target
random initial line
![Page 9: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/9.jpg)
Logistic Regression Code val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { val gradient = data.map(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_ + _) w -= gradient } println("Final w: " + w)
![Page 10: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/10.jpg)
Logistic Regression Performance
0
500
1000
1500
2000
2500
3000
3500
4000
4500
1 5 10 20 30
Ru
nn
ing
Tim
e (
s)
Number of Iterations
Hadoop
Spark
127 s / iteration
first iteration 174 s further iterations 6 s
![Page 11: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/11.jpg)
Example: Collaborative Filtering
Predict movie ratings for a set of users based on their past ratings of other movies
R =
1 ? ? 4 5 ? 3 ? ? 3 5 ? ? 3 5 ? 5 ? ? ? 1 4 ? ? ? ? 2 ?
Movies
Users
![Page 12: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/12.jpg)
Matrix Factorization Model
Model R as product of user and movie matrices A and B of dimensions U×K and M×K
R A =
Problem: given subset of R, optimize A and B
BT
![Page 13: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/13.jpg)
Alternating Least Squares
Start with random A and B
Repeat:
1. Fixing B, optimize A to minimize error on scores in R
2. Fixing A, optimize B to minimize error on scores in R
![Page 14: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/14.jpg)
Serial ALS val R = readRatingsMatrix(...) var A = (0 until U).map(i => Vector.random(K)) var B = (0 until M).map(i => Vector.random(K)) for (i <- 1 to ITERATIONS) { A = (0 until U).map(i => updateUser(i, B, R)) B = (0 until M).map(i => updateMovie(i, A, R)) }
![Page 15: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/15.jpg)
Naïve Spark ALS val R = readRatingsMatrix(...) var A = (0 until U).map(i => Vector.random(K)) var B = (0 until M).map(i => Vector.random(K)) for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R)) .toArray() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R)) .toArray() }
Problem: R re-sent
to all nodes in each parallel
operation
![Page 16: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/16.jpg)
Efficient Spark ALS val R = spark.broadcast(readRatingsMatrix(...)) var A = (0 until U).map(i => Vector.random(K)) var B = (0 until M).map(i => Vector.random(K)) for (i <- 1 to ITERATIONS) { A = spark.parallelize(0 until U, numSlices) .map(i => updateUser(i, B, R.value)) .toArray() B = spark.parallelize(0 until M, numSlices) .map(i => updateMovie(i, A, R.value)) .toArray() }
Solution: mark R as
“broadcast variable”
![Page 17: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/17.jpg)
How to Implement Broadcast?
0
50
100
150
200
250
300
4 cores(1 node)
12 cores(2 nodes)
20 cores(3 nodes)
28 cores(4 nodes)
36 cores(5 nodes)
60 cores(8 nodes)
Tim
e w
ith
in It
era
tio
n (
s)
Computation
Broadcast
36% of iteration spent on
broadcast
Just using broadcast variables gives a significant performance boost, but not enough for all apps
Example: ALS broadcasts 100’s of MB / iteration, which quickly bottlenecked our initial HDFS-based broadcast
![Page 18: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/18.jpg)
Broadcast Methods Explored
Method Results
NFS Server becomes bottleneck
HDFS Scales further than NFS, but limited
Chained Streaming Initial results promising, but straggler nodes cause problems
BitTorrent Off-the-shelf BT adds too much overhead in data center environment
SplitStream Scales well in theory, but needs to be modified for fault tolerance
![Page 19: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/19.jpg)
Broadcast Results
0
50
100
150
200
250
300
350
1 5 10 20 30 40 50 60
Bro
ad
cast
Tim
e (
s)
Number of Nodes
HDFS
250 MB 500 MB
750 MB 1 GB
0
10
20
30
40
50
60
70
1 5 10 20 30 40 50 60B
roa
dca
st T
ime
(s)
Number of Nodes
Chained Streaming
250 MB 500 MB
750 MB 1 GB
![Page 20: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/20.jpg)
ALS Performance with Chained Streaming Broadcast
1862
432
215 128 95 71
0
250
500
750
1000
1250
1500
1750
2000
1 5 10 20 30 40
Ite
rati
on
Du
rati
on
(s)
Number of Nodes
First Iteration
Later Iterations
![Page 21: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/21.jpg)
Language Integration Scala closures are serializable objects
»Serialize on driver, load, & run on workers
Not quite enough »Nested closures may reference entire outer scope »May pull in non-serializable variables not used inside »Solution: bytecode analysis + reflection
![Page 22: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/22.jpg)
Interactive Spark Modified Scala interpreter to allow Spark to be used interactively from the command line
Required two changes: »Modified wrapper code generation so that each “line”
typed has references to objects for its dependencies »Place generated classes in distributed filesystem
Enables in-memory exploration of big data
![Page 23: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/23.jpg)
Demo
![Page 24: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/24.jpg)
Conclusions Spark provides a limited but efficient set of fault tolerant distributed memory abstractions
»Resilient distributed datasets (RDDs) »Restricted shared variables
Planned extensions: »More RDD transformations (e.g., shuffle) »More RDD persistence options (e.g., disk + memory) »Updatable RDDs (for incremental or streaming jobs) »Data sharing across applications
![Page 25: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/25.jpg)
Related Work DryadLINQ
» Build queries through language-integrated SQL operations on lazy datasets » Cannot have a dataset persist across queries » No concept of shared variables for broadcast etc.
Pig and Hive » Query languages that can call into Java/Python/etc UDFs » No support for caching a datasets across queries
OpenMP » Compiler extension for parallel loops in C++ » Annotate variables as read-only or accumulator above loop » Cluster version exists, but not fault-tolerant
Twister and Haloop » Iterative MapReduce implementations using caching » Can’t define multiple distributed datasets, run multiple map & reduce pairs
on them, or decide which operations to run next interactively
![Page 26: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/26.jpg)
Questions
? ?
![Page 27: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/27.jpg)
Backup
![Page 28: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/28.jpg)
Architecture Driver program connects to Mesos and schedules tasks
Workers run tasks, report results and variable updates
Data shared with HDFS/NFS
No communication between workers for now
Driver
Workers
HDFS user code
tasks, results
Mesos
local cache
![Page 29: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/29.jpg)
Mesos slave
Mesos master
Hadoop v20 scheduler
Mesos slave
Hadoop job
Hadoop v20 executor
task
Mesos slave
Hadoop v19
executor
task
MPI scheduler
MPI job
MPI execut
or task
Mesos Architecture
Hadoop v19 scheduler
Hadoop job
Hadoop v19
executor
task
MPI execut
or task
![Page 30: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/30.jpg)
Serial Version val data = readData(...) var w = Vector.random(D) for (i <- 1 to ITERATIONS) { var gradient = Vector.zeros(D) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient } println("Final w: " + w)
![Page 31: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/31.jpg)
Spark Version val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value } println("Final w: " + w)
![Page 32: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/32.jpg)
Spark Version val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) for (p <- data) { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient.value } println("Final w: " + w)
![Page 33: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/33.jpg)
Spark Version val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { var gradient = spark.accumulator(Vector.zeros(D)) data.foreach(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x }) w -= gradient.value } println("Final w: " + w)
![Page 34: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/34.jpg)
Functional Programming Version
val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { w -= data.map(p => { val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_+_) } println("Final w: " + w)
![Page 35: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/35.jpg)
Job Execution
Big Dataset
Slave 4 Slave 3 Slave 2 Slave 1
Master
R1 R2 R3 R4
aggregate
update param
param
Spark
![Page 36: Spark - Mosharaf Chowdhury · Spark Goals Support applications with working sets (datasets reused across parallel operations) »Iterative jobs (common in machine learning) »Interactive](https://reader033.vdocuments.us/reader033/viewer/2022042302/5ecdd4423170013f4a478f8c/html5/thumbnails/36.jpg)
Job Execution
Slave 4 Slave 3 Slave 2 Slave 1
Master
R1 R2 R3 R4
aggregate
update param
param
Master
aggregate
param
Map 4 Map 3 Map 2 Map 1
Reduce
aggregate
Map 8 Map 7 Map 6 Map 5
Reduce
param
Spark Hadoop / Dryad