introduction to spark internals
TRANSCRIPT
Apache Spark Internals
Pietro Michiardi
Eurecom
Pietro Michiardi (Eurecom) Apache Spark Internals 1 / 80
Acknowledgments & Sources
SourcesI Research papers:
F https://spark.apache.org/research.htmlI Presentations:
F M. Zaharia, “Introduction to Spark Internals”,https://www.youtube.com/watch?v=49Hr5xZyTEA
F A. Davidson, “A Deeper Understanding of Spark Internals”,https://www.youtube.com/watch?v=dmL0N3qfSc8
I Blogs:F Quang-Nhat Hoang-Xuan, Eurecom, http://hxquangnhat.com/F Khoa Nguyen Trong, Eurecom,
https://trongkhoanguyenblog.wordpress.com/
Pietro Michiardi (Eurecom) Apache Spark Internals 2 / 80
Introduction and Motivations
Introduction andMotivations
Pietro Michiardi (Eurecom) Apache Spark Internals 3 / 80
Introduction and Motivations
What is Apache Spark
Project goalsI Generality: diverse workloads, operators, job sizesI Low latency: sub-secondI Fault tolerance: faults are the norm, not the exceptionI Simplicity: often comes from generality
Pietro Michiardi (Eurecom) Apache Spark Internals 4 / 80
Introduction and Motivations
Motivations
Software engineering point of viewI Hadoop code base is hugeI Contributions/Extensions to Hadoop are cumbersomeI Java-only hinders wide adoption, but Java support is fundamental
System/Framework point of viewI Unified pipelineI Simplified data flowI Faster processing speed
Data abstraction point of viewI New fundamental abstraction RDDI Easy to extend with new operatorsI More descriptive computing model
Pietro Michiardi (Eurecom) Apache Spark Internals 5 / 80
Introduction and Motivations
Hadoop: No Unified Vision
Sparse modulesDiversity of APIsHigher operational costs
Pietro Michiardi (Eurecom) Apache Spark Internals 6 / 80
Introduction and Motivations
SPARK: A Unified Pipeline
Spark Streaming (stream processing)GraphX (graph processing)MLLib (machine learning library)Spark SQL (SQL on Spark)
Pietro Michiardi (Eurecom) Apache Spark Internals 7 / 80
Introduction and Motivations
A Simplified Data Flow
Pietro Michiardi (Eurecom) Apache Spark Internals 8 / 80
Introduction and Motivations
Hadoop: Bloated Computing Model
Pietro Michiardi (Eurecom) Apache Spark Internals 9 / 80
Introduction and Motivations
SPARK: Descriptive Computing Model
1 val file = sc.textFile("hdfs://...")2
3 val counts = file.flatMap(line => line.split(" "))4 .map(word => (word,1))5 .reduceByKey(_ + _)6
7 counts.saveAsTextFile("hdfs://...")
Organize computation into multiple stages in a processing pipeline
I Transformations apply user code to distributed data in parallelI Actions assemble final output of an algorithm, from distributed data
Pietro Michiardi (Eurecom) Apache Spark Internals 10 / 80
Introduction and Motivations
Faster Processing Speed
Let’s focus on iterative algorithmsI Spark is faster thanks to the simplified data flowI We avoid materializing data on HDFS after each iteration
Example: k-means algorithm, 1 iterationI HDFS ReadI Map(Assign sample to closest centroid)I GroupBy(Centroid_ID)I NETWORK ShuffleI Reduce(Compute new centroids)I HDFS Write
Pietro Michiardi (Eurecom) Apache Spark Internals 11 / 80
Introduction and Motivations
Code Base (2012)
2012 (version 0.6.x): 20,000 lines of code2014 (branch-1.0): 50,000 lines of code
Pietro Michiardi (Eurecom) Apache Spark Internals 12 / 80
Anatomy of a Spark Application
Anatomy of a SparkApplication
Pietro Michiardi (Eurecom) Apache Spark Internals 13 / 80
Anatomy of a Spark Application
A Very Simple Application Example
1 val sc = new SparkContext("spark://...", "MyJob", home,jars)
2
3 val file = sc.textFile("hdfs://...") // This is an RDD4
5 val errors = file.filter(_.contains("ERROR")) // This isan RDD
6
7 errors.cache()8
9 errors.count() // This is an action
Pietro Michiardi (Eurecom) Apache Spark Internals 14 / 80
Anatomy of a Spark Application
Spark Applications: The Big PictureThere are two ways to manipulate data in Spark
I Use the interactive shell, i.e., the REPLI Write standalone applications, i.e., driver programs
Pietro Michiardi (Eurecom) Apache Spark Internals 15 / 80
Anatomy of a Spark Application
Spark Components: details
Pietro Michiardi (Eurecom) Apache Spark Internals 16 / 80
Anatomy of a Spark Application
The RDD graph: dataset vs. partition views
Pietro Michiardi (Eurecom) Apache Spark Internals 17 / 80
Anatomy of a Spark Application
Data Locality
Data locality principleI Same as for Hadoop MapReduceI Avoid network I/O, workers should manage local data
Data locality and cachingI First run: data not in cache, so use HadoopRDD’s locality prefs
(from HDFS)I Second run: FilteredRDD is in cache, so use its locationsI If something falls out of cache, go back to HDFS
Pietro Michiardi (Eurecom) Apache Spark Internals 18 / 80
Anatomy of a Spark Application
Lifetime of a Job in Spark
RDD Objects
rdd1.join(rdd2)
.groupBy(...)
.filter(...)
Build the operator DAG
DAG Scheduler
Split the DAG into
stages of tasks
Submit each stage and
its tasks as ready
Task Scheduler
Cluster(manager(
Launch tasks via Master
Retry failed and strag-
gler tasks
Worker
Block&manager&
Threads&
Execute tasks
Store and serve blocks
Pietro Michiardi (Eurecom) Apache Spark Internals 19 / 80
Anatomy of a Spark Application
In Summary
Our example Application: a jar fileI Creates a SparkContext, which is the core component of the
driverI Creates an input RDD, from a file in HDFSI Manipulates the input RDD by applying a filter(f: T =>Boolean) transformation
I Invokes the action count() on the transformed RDDThe DAG Scheduler
I Gets: RDDs, functions to run on each partition and a listener forresults
I Builds Stages of Tasks objects (code + preferred location)I Submits Tasks to the Task Scheduler as readyI Resubmits failed Stages
The Task SchedulerI Launches Tasks on executorsI Relaunches failed TasksI Reports to the DAG Scheduler
Pietro Michiardi (Eurecom) Apache Spark Internals 20 / 80
Spark Deployments
Spark Components: System-level View
Pietro Michiardi (Eurecom) Apache Spark Internals 22 / 80
Spark Deployments
Spark Deployment Modes
The Spark Framework can adopt several cluster managersI Local ModeI Standalone modeI Apache MesosI Hadoop YARN
General “workflow”I Spark application creates SparkContext, which initializes theDriverProgram
I Registers to the ClusterManagerI Ask resources to allocate ExecutorsI Schedule Task execution
Pietro Michiardi (Eurecom) Apache Spark Internals 23 / 80
Spark Deployments
Worker Nodes and Executors
Worker nodes are machines that run executorsI Host one or multiple WorkersI One JVM (= 1 UNIX process) per WorkerI Each Worker can spawn one or more Executors
Executors run tasksI Run in child JVM (= 1 UNIX process)I Execute one or more task using threads in a ThreadPool
Pietro Michiardi (Eurecom) Apache Spark Internals 24 / 80
Spark Deployments
Comparison to Hadoop MapReduce
Hadoop MapReduceOne Task per UNIX process(JVM), more if JVM reuseMultiThreadedMapper,advanced feature to havethreads in Map Tasks
→ Short-lived Executor, with onelarge Task
SparkTasks run in one or moreThreads, within a single UNIXprocess (JVM)Executor process staticallyallocated to worker, even withno threads
→ Long-lived Executor, withmany small Tasks
Pietro Michiardi (Eurecom) Apache Spark Internals 25 / 80
Spark Deployments
Benefits of the Spark Architecture
IsolationI Applications are completely isolatedI Task scheduling per application
Low-overheadI Task setup cost is that of spawning a thread, not a processI 10-100 times fasterI Small tasks→ mitigate effects of data skew
Sharing dataI Applications cannot share data in memory nativelyI Use an external storage service like Tachyon
Resource allocationI Static process provisioning for executors, even without active tasksI Dynamic provisioning under development
Pietro Michiardi (Eurecom) Apache Spark Internals 26 / 80
Resilient Distributed Datasets
Resilient DistributedDatasets
M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M.J.Franklin, S. Shenker, I. Stoica.Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,USENIX Symposium on Networked Systems Design and Imple-mentation, 2012
Pietro Michiardi (Eurecom) Apache Spark Internals 27 / 80
Resilient Distributed Datasets
What is an RDD
RDD are partitioned, locality aware, distributed collectionsI RDD are immutable
RDD are data structures that:I Either point to a direct data source (e.g. HDFS)I Apply some transformations to its parent RDD(s) to generate new
data elements
Computations on RDDsI Represented by lazily evaluated lineage DAGs composed by
chained RDDs
Pietro Michiardi (Eurecom) Apache Spark Internals 28 / 80
Resilient Distributed Datasets
RDD Abstraction
Overall objectiveI Support a wide array of operators (more than just Map and Reduce)I Allow arbitrary composition of such operators
Simplify schedulingI Avoid to modify the scheduler for each operator
→ The question is: How to capture dependencies in a general way?
Pietro Michiardi (Eurecom) Apache Spark Internals 29 / 80
Resilient Distributed Datasets
RDD Interfaces
Set of partitions (“splits”)I Much like in Hadoop MapReduce, each RDD is associated to
(input) partitionsList of dependencies on parent RDDs
I This is completely new w.r.t. Hadoop MapReduceFunction to compute a partition given parents
I This is actually the “user-defined code” we referred to whendiscussing about the Mapper and Reducer classes in Hadoop
Optional preferred locationsI This is to enforce data locality
Optional partitioning info (Partitioner)I This really helps in some “advanced” scenarios in which you want
to pay attention to the behavior of the shuffle mechanism
Pietro Michiardi (Eurecom) Apache Spark Internals 30 / 80
Resilient Distributed Datasets
Hadoop RDD
partitions = one per HDFS block
dependencies = none
compute(partition) = read corresponding block
preferredLocations(part) = HDFS block location
partitioner = none
Pietro Michiardi (Eurecom) Apache Spark Internals 31 / 80
Resilient Distributed Datasets
Filtered RDD
partitions = same as parent RDD
dependencies = one-to-one on parent
compute(partition) = compute parent and filter it
preferredLocations(part) = none (ask parent)
partitioner = none
Pietro Michiardi (Eurecom) Apache Spark Internals 32 / 80
Resilient Distributed Datasets
Joined RDD
partitions = one per reduce task
dependencies = shuffle on each parent
compute(partition) = read and join shuffled data
preferredLocations(part) = none
partitioner = HashPartitioner(numTask)1
1Spark knows this data is hashed.Pietro Michiardi (Eurecom) Apache Spark Internals 33 / 80
Resilient Distributed Datasets
Dependency Types (1)
Narrow dependencies Wide dependencies
Pietro Michiardi (Eurecom) Apache Spark Internals 34 / 80
Resilient Distributed Datasets
Dependency Types (2)
Narrow dependenciesI Each partition of the parent RDD is used by at most one partition of
the child RDDI Task can be executed locally and we don’t have to shuffle. (Eg:map, flatMap, filter, sample)
Wide DependenciesI Multiple child partitions may depend on one partition of the parent
RDDI This means we have to shuffle data unless the parents are
hash-partitioned (Eg: sortByKey, reduceByKey, groupByKey,cogroupByKey, join, cartesian)
Pietro Michiardi (Eurecom) Apache Spark Internals 35 / 80
Resilient Distributed Datasets
Dependency Types: OptimizationsBenefits of Lazy evaluation
I The DAG Scheduler optimizes Stages and Tasks before submittingthem to the Task Scheduler
I Piplining narrow dependencies within a StageI Join plan selection based on partitioningI Cache reuse
Pietro Michiardi (Eurecom) Apache Spark Internals 36 / 80
Resilient Distributed Datasets
Operations on RDDs: Transformations
TransformationsI Set of operations on a RDD that define how they should be
transformedI As in relational algebra, the application of a transformation to an
RDD yields a new RDD (because RDD are immutable)I Transformations are lazily evaluated, which allow for optimizations
to take place before execution
Examples (not exhaustive)I map(func), flatMap(func), filter(func)I grouByKey()I reduceByKey(func), mapValues(func), distinct(),sortByKey(func)
I join(other), union(other)I sample()
Pietro Michiardi (Eurecom) Apache Spark Internals 37 / 80
Resilient Distributed Datasets
Operations on RDDs: Actions
ActionsI Apply transformation chains on RDDs, eventually performing some
additional operations (e.g., counting)I Some actions only store data to an external data source (e.g.
HDFS), others fetch data from the RDD (and its transformationchain) upon which the action is applied, and convey it to the driver
Examples (not exhaustive)I reduce(func)I collect(), first(), take(), foreach(func)I count(), countByKey()I saveAsTextFile()
Pietro Michiardi (Eurecom) Apache Spark Internals 38 / 80
Resilient Distributed Datasets
Operations on RDDs: Final Notes
Look at return types!I Return type: RDD→ transformationI Return type: built-in scala/java types such as int, long,List<Object>, Array<Object>→ action
Caching is a transformationI Hints to keep RDD in memory after its first evaluation
Transformations depend on RDD “flavor”I PairRDDI SchemaRDD
Pietro Michiardi (Eurecom) Apache Spark Internals 39 / 80
Resilient Distributed Datasets
RDD Code Snippet
SparkContextI This is the main entity responsible for setting up a jobI Contains SparkConfig, Scheduler, entry point of running jobs
(runJobs)
DependenciesI Input RDD(s)
Pietro Michiardi (Eurecom) Apache Spark Internals 40 / 80
Resilient Distributed Datasets
RDD.map operation Snippet
Map: RDD[T]→ RDD[U]
MappedRDDI For each element in a partition, apply function f
Pietro Michiardi (Eurecom) Apache Spark Internals 41 / 80
Resilient Distributed Datasets
RDD Iterator Code Snipped
Method to go through an RDD and apply function fI First, check local cacheI If not found, compute the RDD
Storage LevelsI DiskI MemoryI Off Heap (e.g. external memory stores like Tachyon)I De-serialized
Pietro Michiardi (Eurecom) Apache Spark Internals 42 / 80
Resilient Distributed Datasets
Making RDD from local collections
Convert a local (on the driver) Seq[T] into RDD[T]
Pietro Michiardi (Eurecom) Apache Spark Internals 43 / 80
Resilient Distributed Datasets
Hadoop RDD Code Snippet
Reading HDFS data as <key, value> records
Pietro Michiardi (Eurecom) Apache Spark Internals 44 / 80
Resilient Distributed Datasets
Understanding RDD Operations
Pietro Michiardi (Eurecom) Apache Spark Internals 45 / 80
Resilient Distributed Datasets
Common Transformations
map(f: T => U)
Returns a MappedRDD[U] byapplying f to each element
Pietro Michiardi (Eurecom) Apache Spark Internals 46 / 80
Resilient Distributed Datasets
Common Transformations
flatMap(f: T =>TraversableOnce[U])
Returns aFlatMappedRDD[U] byfirst applying f to eachelement, then flattening theresults
Pietro Michiardi (Eurecom) Apache Spark Internals 47 / 80
Spark Word Count
Detailed Example:Word Count
Pietro Michiardi (Eurecom) Apache Spark Internals 48 / 80
Spark Word Count
Spark Word Count: the driver
1 import org.apache.spark.SparkContext2
3 import org.apache.spark.SparkContext._4
5 val sc = new SparkContext("spark://...", "MyJob", "sparkhome", "additional jars")
Driver and SparkContextI A SparkContext initializes the application driver, the latter then
registers the application to the cluster manager, and gets a list ofexecutors
I Then, the driver takes full control of the Spark job
Pietro Michiardi (Eurecom) Apache Spark Internals 49 / 80
Spark Word Count
Spark Word Count: the code
1 val lines = sc.textFile("input")2 val words = lines.flatMap(_.split(" "))3 val ones = words.map(_ -> 1)4 val counts = ones.reduceByKey(_ + _)5 val result = counts.collectAsMap()
RDD lineage DAG is built on driver side withI Data source RDD(s)I Transformation RDD(s), which are created by transformations
Job submissionI An action triggers the DAG scheduler to submit a job
Pietro Michiardi (Eurecom) Apache Spark Internals 50 / 80
Spark Word Count
Spark Word Count: the DAG
Directed Acyclic GraphI Built from the RDD lineage
DAG schedulerI Transforms the DAG into stages and turns each partition of a stage
into a single taskI Decides what to run
Pietro Michiardi (Eurecom) Apache Spark Internals 51 / 80
Spark Word Count
Spark Word Count: the execution plan
Spark TasksI Serialized RDD lineage DAG + closures of transformationsI Run by Spark executors
Task schedulingI The driver side task scheduler launches tasks on executors
according to resource and locality constraintsI The task scheduler decides where to run tasks
Pietro Michiardi (Eurecom) Apache Spark Internals 52 / 80
Spark Word Count
Spark Word Count: the Shuffle phase
1 val lines = sc.textFile("input")2 val words = lines.flatMap(_.split(" "))3 val ones = words.map(_ -> 1)4 val counts = ones.reduceByKey(_ + _)5 val result = counts.collectAsMap()
reduceByKey transformationI Induces the shuffle phaseI In particular, we have a wide dependencyI Like in Hadoop MapReduce, intermediate <key,value> pairs are
stored on the local file system
Automatic combiners!I The reduceByKey transformation implements map-side
combiners to pre-aggregate data
Pietro Michiardi (Eurecom) Apache Spark Internals 53 / 80
Caching and Storage
Spark’s Storage Module
The storage moduleI Access (I/O) “external” data sources: HDFS, Local Disk, RAM,
remote data access through the networkI Caches RDDs using a variety of “storage levels”
Main componentsI The Cache Manager: uses the Block Manager to perform cachingI The Block Manager: distributed key/value store
Pietro Michiardi (Eurecom) Apache Spark Internals 55 / 80
Caching and Storage
Class Diagram of the Caching Component
Pietro Michiardi (Eurecom) Apache Spark Internals 56 / 80
Caching and Storage
How Caching Works
Frequently used RDD can be stored in memoryI Deciding which RDD to cache is an art!I One method, one short-cut: persist(), cache()
SparkContext keeps track of cached RDDI Uses a data-structed called persistentRDDI Maintains references to cached RDD, and eventually call the
garbage collectorI Time-stamp based invalidation usingTimeStampedWeakValueHashMap[A, B]
Pietro Michiardi (Eurecom) Apache Spark Internals 57 / 80
Caching and Storage
The Block Manager
“Write-once” key-value storeI One node per workerI No updates, data is immutable
Main tasksI Serves shuffle data (local or remote connections) and cached
RDDsI Tracks the “Storage Level” (RAM, disk) for each blockI Spills data to disk if memory is insufficientI Handles data replication, if required
Pietro Michiardi (Eurecom) Apache Spark Internals 59 / 80
Caching and Storage
Storage Levels
The Block Manager can hold data in various storage tiersI org.apache.spark.storage.StorageLevel contains flags to
indicate which tier to useI Manual configuration, in the applicationI Deciding the storage level to use for RDDs is not trivial
Available storage tiersI RAM (default option): if the the RDD doesn’t fit in memory, some
partitions will not be cached (will be re-computed when needed)I Tachyon (off java heap): reduces garbage collection overhead, the
crash of an executor no longer leads to cached data lossI Disk
Data formatI Serialized or as Java objectsI Replicated partitions
Pietro Michiardi (Eurecom) Apache Spark Internals 60 / 80
Resource Allocation
Resource Allocation:Spark Schedulers
Pietro Michiardi (Eurecom) Apache Spark Internals 61 / 80
Resource Allocation
Spark Schedulers
Two main scheduler components, executed by the driverI The DAG schedulerI The Task scheduler
ObjectivesI Gain a broad understanding of how Spark submits ApplicationsI Understand how Stages and Tasks are built, and their optimizationI Understand interaction among various other Spark components
Pietro Michiardi (Eurecom) Apache Spark Internals 62 / 80
Resource Allocation
Submitting a Spark Application: A Walk Through
Pietro Michiardi (Eurecom) Apache Spark Internals 63 / 80
Resource Allocation
Submitting a Spark Application: Details
Pietro Michiardi (Eurecom) Apache Spark Internals 64 / 80
Resource Allocation
The DAG SchedulerStage-oriented scheduling
I Computes a DAG of stages for each job in the applicationLines 10-14, details in Lines 15-27
I Keeps track of which RDD and stage output are materializedI Determines an optimal schedule, minimizing stagesI Submit stages as sets of Tasks (TaskSets) to the Task schedulerLine 26
Data locality principleI Uses “preferred location” information (optionally) attached to each
RDDLine 20
I Package this information into Tasks and send it to the Taskscheduler
Manages Stage failuresI Failure type: (intermediate) data loss of shuffle output filesI Failed stages will be resubmittedI NOTE: Task failures are handled by the Task scheduler, which
simply resubmit them if they can be computed with no dependencyon previous output
Pietro Michiardi (Eurecom) Apache Spark Internals 65 / 80
Resource Allocation
The DAG Scheduler: Implementation Details
Implemented as an event queueI Uses a daemon thread to handle various kinds of eventsLine 6
I JobSubmitted, JobCancelled, CompletionEventI The thread “swipes” the queue, and routes event to the
corresponding handlers
What happens when a job is submitted to the DAGScheduler?
I JobWaiter object is createdI JobSubmitted event is firedI The daemon thread blocks and wait for a job resultLines 3,4
Pietro Michiardi (Eurecom) Apache Spark Internals 66 / 80
Resource Allocation
The DAG Scheduler: Implementation Details (2)
Who handles the JobSubmitted event?I Specific handler called handleJobSubmittedLine 6
Walk-through to the Job Submitted handlerI Create a new job, called ActiveJobI New job starts with only 1 stage, corresponding to the last stage of
the job upon which an action is calledLines 8-9
I Use the dependency information to produce additional stagesF Shuffle Dependency: create a new map stage
Line 16F Narrow Dependency: pipes them into a single stage
getMissingParentStages
Pietro Michiardi (Eurecom) Apache Spark Internals 67 / 80
Resource Allocation
More About Stages
What is a DAGI Directed acyclic graph of stagesI Stage boundaries determined by the shuffle phaseI Stages are run in topological order
Definition of a StageI Set of independent tasksI All tasks of a stage apply the same functionI All tasks of a stage have the same dependency typeI All tasks in a stage belong to a TaskSet
Stage typesI Shuffle Map Stage: stage tasks results are inputs for another stageI Result Stage: tasks compute the final action that initiated a job
(e.g., count(), save(), etc.)
Pietro Michiardi (Eurecom) Apache Spark Internals 68 / 80
Resource Allocation
The Task Scheduler
Task oriented schedulingI Schedules tasks for a single SparkContextI Submits tasks sets produced by the DAG SchedulerI Retries failed tasksI Takes care of stragglers with speculative executionI Produces events for the DAG Scheduler
Implementation detailsI The Task scheduler creates a TaskSetManager to wrap theTaskSet from the DAG schedulerLine 28
I The TaskSetManager class operates as follows:F Keeps track of each task statusF Retries failed tasksF Imposes data locality using delayed scheduling
Lines 29,30
I Message passing implemented using Actors, and precisely usingthe Akka framework
Pietro Michiardi (Eurecom) Apache Spark Internals 69 / 80
Resource Allocation
Running Tasks on Executors
Pietro Michiardi (Eurecom) Apache Spark Internals 70 / 80
Resource Allocation
Running Tasks on Executors
Executors run two kinds of tasksI ResultTask: apply the action on the RDD, once it has been
computed, alongside all its dependenciesLine 19
I ShuffleTask: use the Block Manager to store shuffle outputusing the ShuffleWriterLines 23,24
I The ShuffleRead component depends on the type of the RDD,which is determined by the compute function and thetransformation applied to it
Pietro Michiardi (Eurecom) Apache Spark Internals 71 / 80
Data Shuffling
The Spark Shuffle Mechanism
Same concept as for Hadoop MapReduce, involving:I Storage of “intermediate” results on the local file-systemI Partitioning of “intermediate” dataI Serialization / De-serializationI Pulling data over the network
Transformations requiring a shuffle phaseI groupByKey(), reduceByKey(), sortByKey(), distinct()
Various types of ShuffleI Hash ShuffleI Consolidate Hash ShuffleI Sort-based Shuffle
Pietro Michiardi (Eurecom) Apache Spark Internals 73 / 80
Data Shuffling
The Spark Shuffle Mechanism: an Illustration
Data AggregationI Defined on ShuffleMapTaskI Two methods available:
F AppendOnlyMap: in-memory hash table combinerF ExternalAppendOnlyMap: memory + disk hash table combiner
Batching disk writes to increase throughput
Pietro Michiardi (Eurecom) Apache Spark Internals 74 / 80
Data Shuffling
The Spark Shuffle Mechanism: Implementation Details
Pluggable componentI Shuffle Manager: components registered to SparkEnv, configured
through SparkConfI Shuffle Writer: tracks “intermediate data” for theMapOutputTracker
I Shuffle Reader: pull-based mechanism used by the ShuffleRDDI Shuffle Block Manager: mapping between logical partitioning and
the physical layout of dataPietro Michiardi (Eurecom) Apache Spark Internals 75 / 80
Data Shuffling
The Hash Shuffle Mechanism
Map Tasks write output to multiple filesI Assume: m map tasks and r reduce tasksI Then: m × r shuffle files as well as in-memory buffers (for batching
writes)Be careful on storage space requirements!
I Buffer size must not be too big with many tasksI Buffer size must not be too small, for otherwise throughput
decreases
Pietro Michiardi (Eurecom) Apache Spark Internals 76 / 80
Data Shuffling
The Consolidate Hash Shuffle Mechanism
Addresses buffer size problemsI Executor view vs. Task viewI Buckets are consolidated in a single fileI Hence: F = C × r files and buffers, where C is the number of Task
threads within an Executor
Pietro Michiardi (Eurecom) Apache Spark Internals 77 / 80
Data Shuffling
The Sort-based Shuffle Mechanism
Implements the Hadoop Shuffle mechanismI Single shuffle file, plus an index file to find “buckets”I Very beneficial for write throughput, as more disk writes can be
batchedSorting mechanism
I Pluggable external sorterI Degenerates to Hash Shuffle if no sorting is required
Pietro Michiardi (Eurecom) Apache Spark Internals 78 / 80
Data Shuffling
Data Transfer: Implementation Details
BlockTransfer ServiceI General interface for ShuffleFetcherI Uses BlockDataManager to get local data
Shuffle ClientI Manages and wraps the “client-side”, setting up theTransportContext and TransportClient
Transport Context: manages the transport layerTransport Server: streaming serverTransport Client: fetches consecutive chunks
Pietro Michiardi (Eurecom) Apache Spark Internals 79 / 80