atmm: auto tuning memory manager in apache spark.m044c5439/fulltext.pdfsources to process spark...
TRANSCRIPT
ATMM: Auto Tuning Memory Manager in Apache Spark
A Dissertation Presented
by
Danlin Jia
to
The Department of Electrical and Computer Engineering
in partial fulfillment of the requirements
for the degree of
Master of Science
in
Electrical and Computer Engineering
Northeastern University
Boston, Massachusetts
Dec. 2018
To my family, who teaches me to be brave and patient.
To my friends during masters, Ziyue Xu, Mingyu Sun, Paul Lee, Amada Zhu, for your generous
support.
i
Contents
List of Figures iv
List of Tables v
List of Acronyms vi
Acknowledgments vii
Abstract of the Dissertation viii
1 Introduction 1
2 Background 52.1 Spark Memory Management . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 5
2.1.1 Static Memory Manager . . . . . . . . . . . . . . . . . . . . . . . . . . . 62.1.2 Unified Memory Manager . . . . . . . . . . . . . . . . . . . . . . . . . . 7
2.2 Task Execution . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 92.3 Java Garbage Collection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102.4 Related Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12
3 Motivation 133.1 Static VS. Dynamic: Latency Comparison . . . . . . . . . . . . . . . . . . . . . . 133.2 Sensitivity Analysis On Memory Partition . . . . . . . . . . . . . . . . . . . . . . 153.3 UMM Limitation: GC Impact . . . . . . . . . . . . . . . . . . . . . . . . . . . . 17
4 ATMM: Auto Tuning Memory Management Algorithms 214.1 Auto Tuning Algorithm . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 224.2 Memory Requirement Algorithm . . . . . . . . . . . . . . . . . . . . . . . . . . . 26
5 Evaluation 275.1 Spark Implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 275.2 Performance Comparison . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 30
5.2.1 Latency Comparison . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 305.2.2 Memory Usage and Garbage Collection Analysis . . . . . . . . . . . . . . 31
ii
6 Conclusion And Future Work 36
Bibliography 38
iii
List of Figures
2.1 Static Memory Manager. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 62.2 Unified Memory Manager. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 82.3 Spark Architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 92.4 Spark Task Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102.5 Java Heap . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 11
3.1 comparison of UMM and SMM latency . . . . . . . . . . . . . . . . . . . . . . . . . . . . 143.2 Latency of application under SMM . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 153.3 Memory Usage of 10% and 70% storage fraction of SMM . . . . . . . . . . . . . . . . . . . . 173.4 GC time comparison . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 183.5 Tenured heap status and GC duration . . . . . . . . . . . . . . . . . . . . . . . . . . . . 193.6 Ratio of GC time to execution time . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 20
4.1 Spark Task Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 21
5.1 barChange() simple version . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 285.2 barChange() buffer version . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 285.3 setUp() & setDown() . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 295.4 acquireExcecutionMemory() . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 295.5 acquireStoragenMemory() . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 305.6 Comparison of SMM, UMM and ATMM with different input data size . . . . . . . . . . . . . . . 315.7 Memory Usage Analysis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 325.8 GC Analysis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 34
iv
List of Tables
5.1 Testbed configuration. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 275.2 GC time and Execution time analysis . . . . . . . . . . . . . . . . . . . . . . . . . 35
v
List of Acronyms
RDD Resilient Distributed Dataset. RDD represents an immutable, partitioned collection of ele-ments in Apache Spark.
GC Garbage Collection. Java heap garbage collection mechanism.
SMM Static Memory Manager. The memory manger used before Apache Spark 1.6.0, which isdeprecated in later versions. SMM allocates memory statically which is configured beforesubmitting applications.
UMM Unified Memory Manager. The memory manger used since Apache Spark 1.6.0, allocatingmemory based on execution requirement.
ATMM Auto Tuning Memory manager. A memory manager designed for allocating memorybased on execution time and GC of a task.
DAG Directed Acyclic Graph. Represents work flow of a Spark job, where vertices represent RDDsand edges represent transactions.
LRU Least Recently Used. A caching algorithm that always keeps the least recently used objects.
vi
Acknowledgments
I would like to express my gratitude to my advisor Prof. Mi for her support, her encour-agement throughout my masters study, and patience in guiding me on technical issues. Her academ-ical excellence, her dedication, and her passion for research will always enlighten me furthering myfuture studying
I would also like to thank all my colleagues in lab, Janki, Zhengyu, Mahsa, and Wei , whoguide and support me in the academic. It is always my pleasure to exchange ideas with you andthose ideas always shine.
vii
Abstract of the Dissertation
ATMM: Auto Tuning Memory Manager in Apache Spark
by
Danlin Jia
Master of Science in Electrical and Computer Engineering
Northeastern University, Dec. 2018
, Adviser
Apache Spark is an in-memory analytics framework that has been widely adopted in in-dustry and research fields. Two memory managers, Static and Unified, are available in Spark to allo-cate memory for caching Resilient Distributed Datasets (RDDs) and executing tasks. However, wefound that static memory manager is lack of flexibility, while unified memory manager puts heavypressure on garbage collection of JVM on which Spark resides. To address these issues, we designan auto tuning memory manager (named ATMM) to support dynamic memory requirement withthe consideration of latency introduced by garbage collection. We implement our new auto tuningmemory manager in Spark 2.2.0 and evaluate it by running real experiments in a Spark standalonecluster. Our experimental results show that our auto tuning memory manager is able to reduce thetotal garbage collection time through its duration and frequency, and thus further improve the per-formance (i.e., decreased latency) of Spark applications, compared to the existing Spark memorymanagement solutions.
viii
Chapter 1
Introduction
Large-scale data processing has been one of the most concerned topics both for engineers
and scientists in recent years. With the proliferation of availability to dataset and needs for scalable
and containable mega-data processing framework, various analytics stacks are becoming prevalent
in both industry and research fields. In past years, processing a huge volume of data fully relied on
the performance of computing facilities and efforts of users, and can’t achieve good performance [1].
Frameworks (e.g. Hadoop[2] for sharing resources on the cloud have been proposed to prevent users
from interacting with overwhelming data.
Apache Hadoop[2] contributes to the distributed data processing problem by introducing
a data processing framework called MapReduce [3]. Supported by this engine, large-scale data is
partitioned into small pieces that can be parallelly processed on individual machines. Parallel com-
puting releases the pressure of processing large-scale data on one machine. However, it has been no-
ticed that in Hadoop, many I/O requests are generated for accessing intermediate data, which delay
the execution of data processing application. To address this issue, in-memory analytic frameworks
(e.g. Apache Spark[4]), were then developed to lift data-process performance significantly.
Apache Spark[4], as one of the most successful in-memory analytic frameworks, has been
going through a bloom in the past few years. It was originally developed by Berkely in 2012 and
became a popular industrial-level stack in 2014. Compared to the MapReduce framework, Spark
has low write/read communication between disks as well as effective fault tolerance mechanism.
Specifically, Apache Spark implements an abstraction of data structure, called Resilient
Distributed Datasets (RDD) [5], which is a collection of partitioned records. Each partition of one
RDD can be computed in parallel on different executors. RDD can only be created from an in-
put dataset or from another RDD. And each RDD is immutable. Based on these two features of
1
CHAPTER 1. INTRODUCTION
RDD, Spark can build a lineage of an application which can track each stage of computation and
recover from faults in a tolerant way. When a computation fault occurs, no matter from software or
hardware, Spark finds the latest correct RDD by tracking back the application lineage. Furthermore,
Spark can store intermediate data (i.e., RDDs) in RAM, which deducts communication overhead be-
tween Spark executors. Especially for some iterative and interactive machine learning applications,
RDD provides faster data sharing across the Spark cluster.
Spark is a distributed data processing framework that can run on different resource man-
agement clusters, such as Yarn, Mesos, and Kubernetes. Cluster managers implement different
scheduling schemes to acquire resources for Spark tasks. Yarn uses containers with allocated re-
sources to process Spark tasks, while Mesos applies a non-monolithic framework to get offers of
various resource allocation. Kubernetes is implemented in Spark 2.3.0, which creates pods for Spark
executors. The life circle of an executor pod is commonly shorter than that of containers used on
Yarn and Mesos. Our work focuses on Spark memory management instead of cluster management.
Thus, all of our experiments and evaluation are actually conducted in a Spark standalone cluster.
Such a standalone model simplifies the mechanism of allocating resources and thus allows us to
focus on the impacts of in-memory computation in Spark. Moreover, we remark that our work can
also be applied to Spark running in other resource management clusters.
Apache Spark can speed up task-processing 100 times by taking advantage of in-memory
computing[6]. In-memory computing refers to caching data can processing computation tasks in
memory. In this way, Spark avoids the overhead of I/O latency and improves the overall perfor-
mance. Therefore, in Spark, one of the most crucial factors is the management of memory resources.
An effective memory management scheme can shrink an application’s makespan (i.e., the total ex-
ecution length) and improve the performance dramatically. Unfortunately, Apache Spark hides the
default scheme in memory management from users, who have few opportunities to monitor and
configure the memory space.
Recently, a unified memory manager(UMM) is implemented in the latest version of
Apache Spark to manage memory resources, while the previous one, i.e., a static memory man-
ager(SMM), is deprecated since Spark 1.6.0. Both memory managers (i.e., SMM and UMM) split
the available heap memory into two pools, i.e., storage pool and execution pool. Storage memory
is reserved for caching RDDs. Execution memory is used to cache intermediate data stored for
processing a task. The difference between SMM and UMM is that the former fixes the storage
and execution pool sizes, while the latter can online change the partition between the storage and
execution pools.
2
CHAPTER 1. INTRODUCTION
In this work, we first investigate the performance of Spark under these two memory man-
agers by building a Spark cluster with one driver and four executors. We ran representative data
processing benchmarks, such as K-Means and PageRank, and collected the experiment results un-
der these two memory managers. We found that the Spark performance is significantly affected
by the size of storage and execution memory pools. For SMM, it’s obvious that static memory re-
sources allocation can be inefficient if memory allocation doesn’t fit application requirement well.
For example, if we have a memory intensive application that needs more space to cache RDDs but
only limited storage memory is reserved, then the eviction of RDDs will happen frequently, result-
ing in re-computation of iteratively used RDDs. On the other hand, if an application requires more
execution resources, intermediate data will be spilled to disk that causes heavy I/O overhead when
the space of execution pool is not large enough. We also observed that although UMM dynamically
adjusts the sizes of two memory pools, this memory manager sometimes allocates more memory to
the execution pool than its actual needs. This, unfortunately, introduces a longer garbage collection
duration, and thus make UMM unable to achieve the best performance (i.e., the shortest latency).
Motivated by the above observations, we design an auto-tuning memory manager(ATMM)
by taking both memory demands and garbage collection into consideration to adjust the storage and
execution memory pool sizes. ATMM is an adaptive controller, which collects execution logs of
current and previous tasks and moves the boundary (i.e., the partition line in heap memory) be-
tween storage and execution pools by comparing the ratio of garbage collection duration and task
execution time. We implement ATMM as a new memory manager in Spark 2.2.0. The open source
codes of our ATMM can be found at GitHub. Our evaluation results show that ATMM can improve
the performance by 23% compared to the existing two memory managers.
The main contributions of this dissertation are summarized as follows.
• Understanding of two existing memory managers in Spark. We study the source codes
of two Apache Spark memory managers to understand how these two managers allocate mem-
ory space to the storage and execution pools. We further conduct real experiments to analyze
the performance of these two managers.
• Investigating on how Java Garbage Collection affects Spark performance. We study
the garbage collector that is integrated with Spark and explore how the changes of memory
allocation would influence garbage collector’s behavior.
• Design and implementation of an auto tuning memory manager. We present a new
Spark memory manager, named ATMM, that dynamically tunes the size of storage and mem-
3
CHAPTER 1. INTRODUCTION
ory pools based on the performance of current and previous tasks. We implement and evaluate
ATMM in Spark 2.2.0 and show that this new manager is able to improve the Spark applica-
tion latency
In the remainder of this dissertation, we will first introduce the background of Apache
Spark on memory manager, task execution and Java Garbage Collection in Section 2. In Section
3, we discuss the issues of two existing memory managers, which motivates our design of a new
manager. Section 4 and 5 present the detailed algorithm and the evaluation of our new memory
manager. Conclusion and future work is presented in Section 6.
4
Chapter 2
Background
Apache Spark has been a popular data analytics framework adopted by both engineers
and scientists in recent years. It stands out among other data analytics stacks by two features:
fault-tolerant in-memory computing and distributed data processing on a distributed cluster. As a
memory-centric data analytics stack, Spark implements dynamic memory manager to take full ad-
vantage of on-heap memory and improve the performance of applications. As a distributed data pro-
cessing framework, Spark splits a computation application job to multiple tasks that run parallelly
across the executors of Spark cluster and thus improves the application performance dramatically.
Because Spark is implemented by Scala, i.e., a Java Virtual Machine based programming
language, Java memory manager is the fundamental mechanism of Spark memory management.
Under the hook of the memory manager of Spark, Java garbage collection(GC) is in charge of
collecting unreferenced objects in heap. Unfortunately, garbage collection, which triggers stop-the-
world events, impacts Spark performance significantly.
In this section, we first introduce two existing Spark memory managers: UMM and SMM.
Secondly, we show the process of distributing tasks of an application to each executor in the Spark
cluster. Finally, we discuss the Java garbage collection mechanism applied in Spark.
2.1 Spark Memory Management
Apache Spark has two existing memory managers (i.e., SMM and UMM) for static and
unified memory allocations, respectively. Since Spark 1.6.0, UMM has been set as the default mem-
ory manager for Spark[7]. Meanwhile, SMM has been deprecated because of the lack of flexibility.
In both memory managers, a portion of Java heap is located for processing Spark applications, while
5
CHAPTER 2. BACKGROUND
Reserved Memory
Execution Memoryspark.shuffle.memoryFraction*spark.shuffle.safetyFraction
Storage Memoryspark.storage.memoryFraction*spark.storage.safetyFraction
User Memory(Unmanaged Memory)
Figure 2.1: Static Memory Manager.
the rest of memory is reserved for Java class references and metadata usage. The memory allocated
to run Spark applications consists of two partitions: Storage Memory and Execution Memory. Stor-
age memory is applied to cache RDDs, while execution memory is used for on-time task processing.
If storage memory is fully utilized, then some cached RDDs might be evicted in LRU (i.e., Least
Recently Used) when a new RDD needs to be cached. On the other hand, if execution memory is
full, all intermediate objects will be serialized and spilled into the disk to release execution memory
for subsequent tasks. In this section, we will discuss how the two memory managers partition and
manage these two memory areas.
2.1.1 Static Memory Manager
Static Memory Manager(SMM) is a simple scheme for memory management. It divides
memory into two fixed partitions statically. The volumes of storage and execution space are fixed
during application processing. Users are only able to configure the size of storage and execution
memory before an application is submitted. Furthermore, in most situations, users are blind to the
characteristics of their Spark applications. Thus, they need to configure the memory space based on
their previous experience, which hardly guarantees a satisfying performance.
Particularly, memory allocation of SMM can be configured by four parameters: spark.storage.memoryFraction,
spark.storage.safetyFraction, spark.shuffle.memoryFraction and spark.shuffle.safetyFraction. As
shown in Fig. 2.1, Java heap is partitioned to storage and execution memory by 54% and 16%
6
CHAPTER 2. BACKGROUND
of the maximum available JVM memory, separately. One reason for allocating more memory to
storage is that application designers or users expect to have sufficient memory space to store RDDs,
maintain fault tolerance and take a full advantage of in-memory computing. However, on the other
hand, limited execution memory might trigger memory spill, which results in a high overhead of
I/O latency. For a memory-centric analytics framework, any on-time communication with disk can
put heavy pressure on in-memory computation, and then decrease the performance.
SMM is inherited from Hadoop MapReduce memory manager, which just has one MapRe-
duce job. However, Spark applications can be recognized as a series of MapReduce jobs. The
characteristics of a Spark application can vary across time. Conceptually, a static memory manager
cannot fit all Spark application scenarios. For example, PageRank requires more execution memory
when it processes input data. After the first stage of manipulating input data, the rest iterative stages
stand in needs of storage memory. It’s hard to determine what configuration should be applied to
SMM in this case, because any fixed partition of heap memory cannot guarantee good performance
for those applications with undetermined and unpredictable features.
2.1.2 Unified Memory Manager
Since Spark 1.6.0, a new memory manager is adopted to replace SMM to provide Spark
with dynamic memory allocation. In order to guarantee memory allocation based on application
characteristics, Unified Memory Manager (UMM) was proposed to support dynamic memory re-
quirement. It allocates a region of memory as a unified memory container that is shared by storage
and execution. If any of storage or execution memory needs more space, a function, called acquire-
Memory(), will expand one of the memory pools and shrink another one.
Fig. 2.2 shows the memory allocation under this dynamic memory manager in Spark. In
Fig. 2.2, Spark’s available memory is present as a unified container. The dashed line is the boundary
between storage and execution memory pools, which can be dynamically adjusted online. In UMM,
the parameter RESERVRF SYSTEM MEMORY BYTES (set as 300MB by default) is used to reserve
the space to store objects of Java classes. Besides, spark.memory.fraction defines the percentage
of user memory (i.e., heap memory - reserved system memory) applicable to storage and execution
memory, called unified memory. And spark.storage.fraction further sets the initial storage pool size,
and thus the rest of unified memory is allocated to the execution memory pool. Therefore, unified
memory that is accessible to Spark applications can be calculated as (HeapSize− 300MB) ∗ 0.6.
In the initialization of each Spark context, both storage and execution memory pools are allocated as
7
CHAPTER 2. BACKGROUND
Reserved User MemoryRESERVED_SYSTEM_MEMORY_BYTES
Unified Memoryspark.memory.fraction
User Memory(Unmanaged Memory)
Storage Memoryspark.storage.fraction
Execution Memory
Figure 2.2: Unified Memory Manager.
50% of unified memory. For example, if we set 1GB memory for each JVM, then unified memory
is (1GB − 300MB) ∗ 0.6 = 434.4MB, and storage and execution memory is 434.4MB ∗ 0.5 =
217.2MB each.
UMM adjusts the boundary between storage and execution memory according to memory
requirements while an application is running. Both storage and execution memory can borrow
memory from each other if another is not full. For example, when a new RDD is coming, if storage
memory pool is fully utilized and execution pool still has free space, then UMM expands storage
poo to cache the new RDD by shifting the partitioning available bar, and vise verse.
However, consider that the storage pool has exceeded 50% of unified memory and mean-
while execution requests more memory than its current available space. In this scenario, UMM
forces evicting RDDs to shrink storage memory pool until it goes back to 50% or less, and then
allocates more space to execution pool. This happens only if both storage and execution memory
pools are fully occupied.
In opposite, if storage memory pool is invaded by execution memory, when a new RDD
is going to be cached, UMM doesn’t shrink execution pool to ensure enough space to cache the new
RDD. One reason is that RDDs are cached after a task completes. After a task finishes its computa-
tion, all memory required by the task will be released. Then execution memory will have free space
that can be borrowed to storage memory. The other reason is that execution memory pool stores
intermediate objects required to finish a task. Evicting blocks of data of the task that is processed
in the current thread can result in task failure and high latency of the entire application. There-
fore, UMM prefers to evict RDDs from storage pool instead, because RDDs can be recomputed by
tracking the application lineage.
8
CHAPTER 2. BACKGROUND
Worker
Executor
Yarn
Mesos
Standalone
Cluster Manger
Worker
Executor
Worker
Executor
Master
SparkContext
Figure 2.3: Spark Architecture
2.2 Task Execution
Apache Spark cluster consists of one master and multiple workers. A cluster manager
communicates and coordinates with the master to assign tasks to each worker. For Spark on Yarn
and Mesos, each worker could have more than one executors. But, Spark standalone only supports
one executor per worker. Since our work uses the Spark Standalone cluster, we only consider each
worker with one executor. Fig. 2.3 shows the Spark architecture. The driver and the executors
run as independent Java processes, such that they have their own heap space. In other words,
memory usage of each node can be heterogeneous based on their own memory configuration. These
processes can run on the same machine (referred to as a horizontal cluster) or multiple machines
(referred to as a vertical cluster).
Apache Spark is a distributed data processing framework. When an application is submit-
ted, the driver program creates a SparkContext that splits an application to multiple jobs and then
partitions jobs to stages which consist of a set of parallel tasks. Each task is responsible to process
one partition of an RDD. Specifically, each application consists of jobs. Jobs are created when ac-
tions are executed, e.g. collection(), takefirst(). Each job can be represented as a DAG (Directed
Acyclic Graph), which is composed of a series of stages. A stage is a set of parallel tasks running
independently across the Spark cluster. A task is a computation manipulated on one data partition.
When all tasks in a stage finish, a shuffle function is invoked (e.g. join() and reduceByKey()), and
9
CHAPTER 2. BACKGROUND
Input file
Application
Job
Stage 0
Stage 1
Stage 2
t0 t1 t2 t3 t0 t2t1 t3 t1 t3t0 t2 ClusterManager
Executor 0
s0_t0 s0_t1 s1_t0
s1_t1 s2_t0 s2_t1
Executor 1
s0_t2 s0_t3 s1_t2
s1_t3 s2_t2 s2_t3
Figure 2.4: Spark Task Scheduling
the computation results are rearranged and shuffled to the executors that will run the tasks of next
stage. Tasks from different stages may have dependency among each other. Children tasks need to
fetch the results from their parent stages as inputs.
To illustrate how a Spark application is scheduled and executed across executors, we show
an application containing one job with three stages in Fig. 2.4. The parallelism is configured to 4,
i.e., we can have at most 4 tasks run simultaneously from the driver’s point of view. Assume there
are two executors with the same resources and one processing core in each executor. As Fig. 2.4
shown, we have four partitions of the input file to be processed in parallel in a stage. For each
stage, one task is assigned a partition. Totally, 12 tasks will be distributed across the Spark cluster
and executed on executors. If tasks are evenly distributed to two executors, then each executor is
in charge of 6 tasks. From a point of view of an executor, all tasks are executed in sequential. For
example, Executor 0 first runs s0 t0 and s0 t1, and then waits for the completion of s0 t2 and s0 t3.
Once all four tasks in stage 0 are done, executor 0 fetches other results and continues to process
s1 t0 and s1 t1. At the end of stage 2, an action is called to collect all data as an output of a job.
2.3 Java Garbage Collection
Apache Spark is implemented by Scala, which generates Java objects managed by Java
Virtual Machine. In order to understand Spark memory management, it’s essential to investigate
Java memory management. Different from C type programming language, Java objects are not
released by users. A mechanism, Java garbage collection(GC), is developed for users to take re-
10
CHAPTER 2. BACKGROUND
Eden S1 Tenured MetaS2
Old GenerationYoungGeneration
Survivor Space
PermanentGeneration
Figure 2.5: Java Heap
sponsibility to recycle unreferenced objects.
Based on which version of Java integrated with Spark, users can choose one of three types
of garbage collectors, i.e., Serial Collector, Parallel Collector, and the Mostly Concurrent Collector.
Each of them can be applied in different situations. Detail information can be referenced in [8].
In this section, we introduce Serial Collector, the default garbage collection mechanism
integrated with Spark 2.2.0. Serial garbage collector executes garbage collection in a single thread.
It divides Java heap space into two regions (i.e., Young and Old) to manage on-real time intermediate
data. A heap layout is shown in Fig. 2.5. The young generation is used to store short-term objects,
while the old generation collects objects with longer lifetimes. The young generation is segregated
into three partitions, including Eden, S1, and S2. Besides young and old generations, a permanent
generation contains metadata and Java classes used in the application.
When a new object is initialized in Java heap, it will be stored in Eden of the young
generation. When Eden fills up, a minor garbage collection happens. After each minor garbage
collection, all survived objects will be transferred into one of two survivor spaces (i.e, S1 and S2).
And the age of these survived objects will be increased by one. When objects reach to age 8,
they will be transferred into the old generation. A major garbage collection occurs when the old
generation or permanent generation is full.
Both minor and major garbage collections are stop-the-world events. Because only one
garbage collection thread is running, garbage collections stall all processes in JVM, which causes a
high latency of applications running in JVM.
11
CHAPTER 2. BACKGROUND
2.4 Related Work
Memory management has been a concerned topic since Spark was proposed. But, there
does not exist any work on new Spark memory manager design. However, several works have
been published, which focus on optimizing cache algorithm [9], [10] and task scheduling [11],
[12]. These work help us obtain a deep understanding on how memory requirement affects Spark
performance, which in turn provides the perspectives to design our memory manager.
MEMTUNE [13] presents an algorithm that adjusts memory allocation based on the cat-
egories of tasks. It serves the scenario that more than one applications are running on JVM, and
decides how to balance memory allocation to obtain the best performance. This work also considers
the influence on Spark performance from JVM. But, this work only focuses on analyzing the sensi-
tivity of tasks (i.e., storage-sensitive or execution-sensitive), and taking different actions according
to their requirements.
Sparkle [14] designs and implements new Spark modules to optimize Spark performance
on a scale-up machine. It builds a shared memory pool for each executor, which saves the shuffle
time between nodes of the Spark cluster and achieves a performance improvement. This work
leverages the shared memory pool that allows each executor in the Spark cluster to be aware of the
location of each RDD partition, which decreases the overhead of fetching data among each other.
Dynamic JVM heap management [15], [16] enables multiple JVMs to run on the host
simultaneously with reduced contention and analyzes the impact of JVM configuration and GC on
Spark performance. We leverage the knowledge and experiments in these work to help design our
algorithm.
Parameter configuring and memory tuning for the large-scale distributed data process-
ing framework has also been explored [17], [18]. However, these work provides configuration
and performance tuning instructions, which either automatically changes configurations of various
workloads or offers users offline configurations heuristically. Our algorithm aims to adjust memory
allocation online, without bothering users.
The Spark open source community provides suggestions on Spark memory tuning and
GC tuning for users. However, these tuning suggestions need users to explore the characteristics
of applications and configuring workload based on their Spark running environment. We get per-
spectives from these suggestions and propose the new Spark memory manager to automatically tune
memory allocation.
12
Chapter 3
Motivation
In this section, we study the performance of Spark applications managed by two existing
Spark memory managers (i.e., SMM and UMM), and observe that SMM with certain memory con-
figurations can beat UMM (i.e., SMM provides Spark application with lower latency). We change
SMM memory partition and investigate the impact of memory configuration on Spark performance.
We find that both configuring too much and too limited execution memory can degrade Spark per-
formance. We also analyze how UMM introduces long Java garbage collection durations that affect
Spark performance conversely.
In our experiments, we have a Spark cluster with one master and four workers. All four
workers are homogeneous, each of which runs one executor. Thus, we present (e.g., execution
time and GC time) by using the average values over all four executors. In our experiments, we use
PageRank as a representative benchmark to help us investigate SMM and UMM, and derive our
motivation to design and implement a new Spark memory manager.
3.1 Static VS. Dynamic: Latency Comparison
Apache Spark has replaced SMM with UMM since Spark 1.6.0. It’s obvious that SMM
cannot fit all kind of workloads well, because of lack of flexibility. The diversity of characteris-
tics of each stage in a Spark application even leads to a worse performance of SMM. Compared
with SMM, UMM allocates memory resources dynamically according to resource demands. Fur-
thermore, UMM gives higher priority to execution memory than storage memory, as discussed in
Section 2.1. That is, execution memory requirement can force to free space that allocated to storage
memory. But, storage memory cannot shrink execution memory pool if execution memory is fully
13
CHAPTER 3. MOTIVATION
7.16.4
8.7
5.5
0
1
2
3
4
5
6
7
8
9
10
UMM SMM 20% SMM 50% SMM 70%
late
ncy
(min
)
Memory Manager
Figure 3.1: comparison of UMM and SMM latency
utilized and also exceeds 50% of total available memory. Based on this mechanism, UMM guaran-
tees sufficient memory for executing on-time tasks, which avoids memory spilling into the disk at
the greatest extent.
However, we found that in some situations, UMM cannot achieve better performance
than SMM as expected, even applying dynamic memory management. In order to find the reason
behind the observation, we conduct a set of comparison experiments, and keep the default mem-
ory manager modes (i.e., initial memory spaces for storage and execution are both 30% of JVM
heap in UMM, and the size of storage and execution memory pool is 16% and 54% of JVM heap
in SMM, respectively) for both SMM and UMM as introduced in Sec. 2.1.2. We compare appli-
cation latencies under SMM and UMM, where SMM sets the storage fraction (i.e., computed by
spark.storage.memoryFraction * spark.storage.safetyFraction) as 20%, 50%, 70%.
Fig. 3.1 shows the latency of Spark applications when memory manager is UMM and
SMM with 20%, 50% and 70% storage fraction. It’s noticed that the latency of the Spark application
under UMM is 7.1 min, which is lower than that of SMM with 50% storage fraction but higher than
those of SMMs with 20% and 70%.
Conventionally, we thought the performance of UMM should be better than that of SMM
which is fixed and inelastic. However, the unexpected results show the possibility to overcome the
potential disadvantages of UMM and design a new memory manager to improve Spark performance.
14
CHAPTER 3. MOTIVATION
0
2
4
6
8
10
12
10% 20% 30% 40% 50% 60% 70% 80% 90%
Latency(m
in)
SMM storage fraction
Figure 3.2: Latency of application under SMM
3.2 Sensitivity Analysis On Memory Partition
Based on previous experiments, we observed that SMM could obtain lower latency under
some memory configurations. However, it’s not trivial to find the best memory partition point that
provides the most optimized performance. To find the best memory partition point, we conduct a
set of experiments to analyze the impact of memory partition on the Spark performance.
Specifically, we set the storage memory pool size from 10% to 90% of available memory
space under the SMM policy. That is, the execution memory pool size is changed as well from 90%
to 10%. Fig. 3.2 illustrates the experimental results. We observe a high latency on storage fraction
of 10% and 50%. Additionally, the latency increases with storage fraction increasing from 20% to
50%. When the storage memory size is more than 50%, the latency drops gradually until it meets
the shortest latency at the point of 80%. After that, the latency grows up again when the storage
fraction is 90%.
We then focus on three conditions, i.e., 20%, 10% and 90%, to analyze the impact of
memory partition on Spark performance. Before that, we first study the characteristics of our used
benchmark, i.e., PageRank. PageRank is a conventional machine learning benchmark which de-
termines the relevance or importance of a page by computing its rank iteratively. PageRank is
a memory-intensive application, which requires considerable execution memory at the first stage
where it reads and processes input data. In following iterations, each stage needs to join the shuffled
results of the last stage with the result of the first stage. This requires the RDD that contains the
result of the first stage to be cached in the storage memory pool for reuse in each iteration stage.
Therefore, PageRank requires more execution memory at the first stage and then needs more stor-
15
CHAPTER 3. MOTIVATION
age memory at the following stages. The size of storage memory required by PageRank depends
on the size of the RDD that needs to be cached. In our experiments, we found that 20% of total
available memory is sufficient to cache that RDD, and meanwhile 80% of total available memory
(as execution memory) satisfies the task execution need. This is the reason why SMM provides low
latency when storage fraction is set to 20%.
It’s also noticeable that when the storage fraction is set to 10%, the latency is considerably
higher than other fractions. This is because at this point storage memory pool is too limited to
cache the reused RDD. When each iterative stage needs to join with the reused RDD, it cannot
fetch this RDD from the storage memory. Instead, each iterative stage needs to recompute the
RDD, which thus increments the latency of the application. At the other end, when we increase
the storage fraction to 90%, we can observe that the latency increases, which results from memory
spilling into disk. In this scenario, the execution memory is too limited to execute a task in memory.
Parts of intermediate objects required by task execution are serialized and spilled into disk. The
communication with the disk can cause high I/O overhead, which degrades the Spark performance.
However, we find that the spill of memory does not always affect Spark performance conversely.
We will discuss this in Section 3.3.
To understand what actually happens in the point of view of memory pool, we measure
the runtime memory usage for both UMM and SMM with the storage memory fraction of 10% and
80%. This provides us with more perspectives to understand how memory allocation affects Spark
performance. Fig. 3.3-(a) and Fig. 3.3-(b) show the actual storage memory usage across time (the
solid lines in the figures) under SMM. The dashed line in these figures is the pool size of storage
memory. These figures have similar patterns that memory usage increases as time passes by. It’s
worth to notice that in Fig. 3.3-(a), RDD eviction happens because we can see the drop in storage
memory usage. This explains why we observed high latency in Fig. 3.2, when storage fraction is set
to 10%.
Fig. 3.3-(c) and Fig. 3.3-(d) show the corresponding execution memory usage (see the
solid lines), where the dashed line is the pool size of execution memory. The comparison of two
execution memory usage figures can demonstrate the occurrence of memory spilling. When the
storage memory fraction is set to 10% in Fig. 3.3-(c), the two spikes of the solid line represent the
memory consumed by executing two tasks. The two tasks read and process input data with a high
memory consumption. When the size of the execution memory pool is limited to 30%, memory
spilling happens. We observe that in Fig. 3.3-(d), there are more spikes of the solid line. This is
because, in order to complete a single task, Spark needs to serialize and send intermediate data into
16
CHAPTER 3. MOTIVATION
0
10
20
30
40
50
60
70
80
0 12 302
418
420
428
433
434
485
494
494
498
502
503
506
511
511
515
519
520
520
521St
orag
e M
emor
y U
sage
(MB)
Time (s)
Used Memory Memory Pool Size
a) 10%storage fraction
0
100
200
300
400
500
600
0 721
123
423
524
424
524
924
925
225
325
625
625
926
026
326
326
726
727
027
027
227
227
527
527
6Stor
age
Mem
ory
Usa
ge (M
B)
Time (s)
Used Storage Memory Memory Pool Size
b) 70%storage fraction
c) 10%storage fraction
0
100
200
300
400
500
600
700
014
628
933
040
640
941
241
441
641
946
947
047
747
948
148
448
648
848
949
249
349
749
850
050
2
Exec
utio
n M
emor
y U
sage
(M
B)
Time (s)
Used Memory Memory Pool size
0
50
100
150
200
250
0 18 29 45 59 73 111
126
136
150
162
178
203
226
229
235
237
240
243
246
249
251
254
257
260
261
264
Exec
utio
n M
emor
y U
sage
(M
B)Time (s)
Used Memory Memoy Pool S ize
d) 70%storage fraction
Figure 3.3: Memory Usage of 10% and 70% storage fraction of SMM
the disk to release space for following executions.
Memory spilling may cause high I/O overhead that deteriorates Spark performance. How-
ever, we also find that “spill” releases pressure on GC and trades for performance improvement. We
will investigate the impact of GC on Spark and the performance decrease introduced by UMM in
the next section.
3.3 UMM Limitation: GC Impact
In Section 3.1, we discovered that SMM can provide better performance than UMM. Now,
we turn to explain the reason behind the observation which guides us to the motivation of our new
memory manager.
We first conduct experiments to explore the contribution of GC to Spark application la-
tency. In previous experiments, we observe that UMM has a longer latency than SMM when storage
memory fraction is set to 20% and 50%. In order to find the reason behind this, we plot the GC
times for SMM with different storage fractions, and compare them with that of UMM in Fig. 3.4.
17
CHAPTER 3. MOTIVATION
0
20
40
60
80
100
120
140
160
UMM SMM0.1
SMM0.2
SMM0.3
SMM0.4
SMM0.5
SMM0.6
SMM0.7
SMM0.8
SMM0.9
GC
Tim
e(s
)
Memory Manager
Figure 3.4: GC time comparison
We observe that SMM has lower GC time when storage fraction is set to 20%, 30%, and more than
70%. Moreover, all these points, GC time drops below 60 seconds. By combining the results in
Fig. 3.4 and Fig. 3.2, we can draw a conclusion that GC contributes to the latency of applications
and has considerable impacts on the Spark performance.
In order to study the impact of GC mechanism on execution time, we further plot the
changes of tenured space of Java heap for both UMM and SMM with storage fractions of 70% in
Fig. 3.5. We also present each full GC time across the duration of an application. We observe that
for UMM when the first full GC happens, the size of tenured space increases to the maximum size,
which is approximately 600MB. The corresponding full GC in UMM can be as long as 20 to 50
seconds. However, for SMM, the size of tenured space increases gradually instead of boosting to
the maximum size. Conceptually, the full GC time for SMM is always less than 10 seconds when
running the same benchmark as UMM. We can conclude that UMM introduces longer GCs because
a huge amount of intermediate data is stored in execution memory which puts heavy pressure on
the needs of tenured space and then causes long GC time. An increase of GC time finally results
in a long latency of a Spark application. Thus, we remark that GC time should be taken into
consideration to balance storage and execution memory.
In our work, we consider the ratio of GC time to execution time as a measurement of
Spark performance. A low ratio of GC time to execution time indicates “good performance”, vise
verse. We use n to represent GC time and m to represent execution time. Theoretically, when n
18
CHAPTER 3. MOTIVATION
0
100
200
300
400
500
600
66.26
84.814
135.394
191.722
240.13
301.853
380.022
521.887
679.398
832.422
979.151
TenuredHe
apSize
(MB)
Time (s)
a) SMM 70%Tenured Heap Size b) UMMTenured Heap Size
0
100
200
300
400
500
600
700
60.539
64.641
74.319
89.021
225.292
343.459
503.15
623.752
803.459
958.107
1114.25
1250.67
1431.358
1573.61
1730.744
1847.545
2154.977
TenuredHe
apSize
(MB)
Time (s)
0123456789
66.26
84.814
135.394
191.722
240.13
301.853
380.022
521.887
679.398
832.422
979.151
FullGC
time(s)
Time (s)
c) SMM 70%Full GC Time
0
10
20
30
40
50
60
60.539
64.641
74.319
89.021
225.292
343.459
503.15
623.752
803.459
958.107
1114.25
1250.67
1431.358
1573.61
1730.744
1847.545
2154.977
FullGC
ime(s)
Time (s)
d) UMMFull GC Time
Figure 3.5: Tenured heap status and GC duration
19
CHAPTER 3. MOTIVATION
0.00%
5.00%
10.00%
15.00%
20.00%
25.00%
30.00%
35.00%
UMM SMM0.1
SMM0.2
SMM0.3
SMM0.4
SMM0.5
SMM0.6
SMM0.7
SMM0.8
SMM0.9
Ratio
Memory Manager
Figure 3.6: Ratio of GC time to execution time
increases by a duration of λ, m will increase by λ as well. Then, we have Equation. 3.1.
n
m<
n+ λ
m+ λ(3.1)
Equation. 3.1 indicates that if GC time increases, the ratio of GC time to execution time
increases as well. We then plot the ratio of GC time to execution time of UMM and compare it
with SMM with various storage fractions in Fig. 3.6. The pattern of the ratio is similar to that of
the latency as shown in Fig. 3.2, which indicates GC considerably contributes to the latency of an
application. Additionally, Fig. 3.6 shows that UMM has a higher ratio of GC time to execution time
than those under SMM across storage fractions except 50%. We conclude that the ratio can be used
as a parameter to measure Spark performance.
20
Chapter 4
ATMM: Auto Tuning Memory
Management Algorithms
We propose a new Spark memory manager, called Auto-Tuning Memory Manager (ATMM),
to dynamically adjust the storage and execution memory pools. ATMM is an adaptive controller,
which collects execution logs of current and previous tasks and moves the boundary (i.e., the parti-
tion line in heap memory) between storage and execution pools, by comparing the ratio of garbage
collection duration to task execution time.
Storage MemoryPool
Execution MemoryPool
BlockManager
Task MemoryManager
Auto TuningAlgorithm
MemoryRequirement
Algorithm
Executor
ATMM
Storage Memory Execution Memory
Figure 4.1: Spark Task Scheduling
21
CHAPTER 4. ATMM: AUTO TUNING MEMORY MANAGEMENT ALGORITHMS
Figure 4.1 shows the overall block diagram illustrating the interaction of ATMM with
other Spark task scheduling components. Specifically, ATMM mainly consists of two major com-
ponents, 1) Auto Tuning Algorithm, and 2) Memory Management Algorithm. After the completion
of each task, the “Executor” module calls the “Auto Tuning Algorithm” to determine the amount of
memory required for RDD storage and task execution. The “Block Manager” and the “Task Mem-
ory Manager” of Spark call the “Memory Management Algorithm” to send memory requirements
to ATMM. Then, depending upon the allocations determined by “Auto Tuning Algorithm” and the
memory demands, the corresponding amount of memory that will be actually allocated for storage
and task execution. In the next of this section, we explain these two modules in detail and present
our designs for each of them, which results in the performance improvement of Spark applications.
4.1 Auto Tuning Algorithm
We first present an algorithm called Auto Tuning Algorithm to dynamically adjust the
amount of memory dedicated to the execution of tasks and storage of intermediate data (i.e., RDD)
to minimize the latency of Spark applications. Specifically, Spark memory container has two parti-
tions, such as storage memory and execution memory. We thus consider there exists a partition “bar”
between storage and execution memory in Spark. If the ”bar goes up,” then the storage memory pool
expands, and the execution memory pool shrinks. On the contrary, if the “bar goes down”, then the
storage memory pool shrinks and execution memory pool enlarges. Alg. 1 shows the pseudo code
of our “Auto Tuning Algorithm”, which is called by an executor when each of its tasks completes.
Our algorithm dynamically tunes the bar (i.e., “barChange()”) between storage and exe-
cution memory to move up (i.e., “setUp()”) or move down (i.e., “setDown()”), during the run time
to optimize the application performance. Our algorithm also considers the impact of garbage col-
lection to determine the direction to move the boundary between execution and storage memory.
Specifically, we use the ratio of the GC time to the execution time of a task, to quantify the per-
formance of each task. The inputs to the “barChange()” procedure of our algorithm are, preRatio,
GCTime, executionTime and preUpOrDown, where “GCTime” and “executionTime” are values for
the current task and “preRatio” is computed from the values of previous tasks.
Procedure barChange() compares the performance of the current task with that of previ-
ous tasks by comparing ratios of GC time to execution time, with consideration of the last adjust-
ment decision. In particular, we compare “curRatio” with “preRatio”, and analyze the direction in
which the bar was moved for the last task to determine if our previous action resulted into a “re-
22
CHAPTER 4. ATMM: AUTO TUNING MEMORY MANAGEMENT ALGORITHMS
Algorithm 1: Auto Tuning Algorithm.
1 Procedure barChange(preRatio, GCTime, executionT ime, preUpOrDown)
2 curRatio = GCTime/executionT ime;
3 if (curRatio<preRatio and preUpOrDown=true) or (curRatio>preRatio and preUpOrDown=false) then
4 setUp(step);
5 return (curUpOrDown=true, update preRatio)
6 else
7 setDown(step);
8 return (curUpOrDown=false, update preRatio)
9 Procedure setUp(step)
10 memoryIncrease=totalMemory*step;
11 if heapStorageMemory+memoryIncrease<totalMemory then
12 increaseStoragePoolsize(memoryIncrease);
13 decreaseExecutionPoolsize(memoryIncrease);
14 if usedStoragePoolSize/totalStoragePoolSize>0.8 then
15 heapStorageMemory = heapStorageMemory + memoryIncrease;
16 return heapStorageMemory
17 Procedure setDown(step)
18 memoryDecrease=totalMemory*step;
19 if heapStorageMemory-memoryDecrease>0 then
20 memoryEvict=memoryDecrease-freeStorageMemory;
21 if memoryEvict>0 then
22 storageMemoryPool.freeMemory(memoryEvict);
23 decreaseStoragePoolsize(memoryDecrease);
24 increaseExecutionPoolsize(memoryDecrease);
25 heapStorageMemory = heapStorageMemory - memoryDecrease;
26 return heapStorageMemory
ward” (i.e., performance improved) or a “penalty” (i.e., performance degraded). If the current task
has better performance (i.e., with a lower ratio of GC time to execution time), then the boundary-
moving decision that we previously made (i.e., “preUpOrDown”) is a reward. Thus, we decide to
keep moving the boundary further in the same direction as that of the last task. Otherwise, we move
the boundary in the direction that is opposite to that of the last task.
We can examine the overall latency by considering a single previous task or a group of
multiple previous tasks to compute “preRatio”. Intuitively, by considering a group of multiple pre-
vious tasks to decide our future action, we are able to improve the accuracy of our decision by
minimizing the effect of some exceptional tasks. One example of such an exceptional task is that
sometime due to a poor network connection, a task may fail and relaunch. Such unexpected situa-
tions can drastically increase execution time and then decrease the “preRatio” in an unanticipated
manner. Usually, we desire to have smaller “preRatio” for achieving better performance. As a re-
23
CHAPTER 4. ATMM: AUTO TUNING MEMORY MANAGEMENT ALGORITHMS
sult, this exceptional task with small “preRatio” will lead to a reward incorrectly. Therefore in order
to minimize such exceptions, we should consider a group of multiple previous tasks. However, on
the other hand, considering the history of all possible previous tasks is not good either, because the
status of the JVM heap might have a change after the execution of a huge amount of tasks, compared
to the beginning of the application execution. For example, before the execution of the first task,
JVM heap is empty. But, after running a large set of tasks, many objects are stored in JVM heap,
which results in a longer GC duration. Thus, comparing the current task performance with either the
last task or extremely earlier tasks cannot help us make correct decisions. To reflect the observation
from a group of multiple previous tasks, we consider recording the average GC time and execution
time of all tasks in a moving window to obtain cumulative “preRatio”. In our algorithm, the size of
this moving window is configurable. For example, configuring the size of the moving window as
20% of assigned tasks in each stage on an executor sets the window size as 5, if each stage has 25
tasks.
Based on the decision made in procedure barChange(), storage and execution memory
pools expand or shrink under the control of procedure setUp() and setDown(). As mentioned before,
setting the bar up means to increase storage memory pool size and shrink execution memory pool,
while setting the bar down means to decrease storage memory pool size and expand execution
memory pool. The user can configure the amount of increase or decrease. We set it as 5% of total
available memory for storage and execution by default.
It is challenging to adjust the boundary if both storage and execution memory pools are
already fully utilized. In this scenario, objects eviction will occur, which requires a mechanism
to determine how to evict objects. For storage memory, the LRU (Least Recently Used), an ex-
isting RDD caching algorithm, is applied by the Spark block manager. We inherit this caching
algorithm to manage our RDD evictions from storage memory. For execution memory, when pro-
cedure barChange() is called, a task has finished its computation and released all memory resources
already. Thus, there is no need to evict objects from the execution memory pool. This is also the
reasons why we adjust the memory boundary after every task’s completion.
The Procedure setUp() takes the “step” size as an input and returns “heapStorageMem-
ory”. We remark that the variable is new in our design, which indicates the maximum allowed
memory for execution. We use the variable to avoid long GC tome caused by large execution mem-
ory space. If the sum of original “heapStorageMemory” and “memoryIncrease” is lesser than the
total memory, then we increase the storage pool size (i.e., decrease the execution pool size) by the
amount of “memoryIncrease” (see lines 10 to 13 in Alg. 1). We update heapStorageMemory only
24
CHAPTER 4. ATMM: AUTO TUNING MEMORY MANAGEMENT ALGORITHMS
Algorithm 2: Memory Requirement Algorithm.
1 Procedure acquireExecutionMemory(numBytes)
2 extraNeed=numBytes-freeExecutionMemory;
3 if extraNeed>0 and storageMemoryPoolSize>heapStorageMemory then
4 memoryBorrow=min(extraNeeded, storageMemoryPoolSize - heapStorageMemory,
freeStorageMemory);
5 decreaseStoragePoolsize(memoryBorrow);
6 increaseExecutionPoolsize(memoryBorrow);
7 if memoryBorrow=freeStorageMemory then
8 heapStorageMemory=storageMemoryPoolSize;
9 return executionMemoryPool.acquire(freeExecution+memoryBorrow)
10 else
11 return executionMemoryPool.acquire(numBytes)
12 Procedure acquireStorageMemory(numBytes)
13 memoryToFree=max(0, numBytes-freeStorageMemory);
14 if memoryToFree>0 then
15 storageMemoryPool.freeMemory(memoryToFree);
16 return storageMemoryPool.acquire(numBytes)
if the storage memory is fully utilized, i.e., if used storage memory is more than 80% of the storage
memory pool size (see lines 14, and 15 in Alg. 1). Otherwise, we fix heapStorageMemory and
only change the size of memory pools. In this way, heapStorageMemory is less or equal to storage
memory pool size.
Procedure setDown() has the same input and output as Procedure setUp(). If the differ-
ence between “memoryDecrease” and original “heapStorageMemory” is larger than zero, then we
check whether RDDs need to be evicted. Once we identify that “memoryEvict” (i.e., the gap be-
tween “numBytes” and “freeStorageMemory”) is larger than zero, we need to evict some RDDs to
shrink the storage memory pool. Finally, Procedure setDown() shrinks the storage memory pool
and returns a decreased heapStorageMemory.
Both setUp() and setDown() reallocate the memory pool size of storage and execution,
based on the decision made in Procedure barChange(). It’s noticeable that the variable heapStorage-
Memory is updated by both Procedure setUp() and setDown(), which will be used in the “Memory
Requirement Algorithm”, introduced in next subsection.
25
CHAPTER 4. ATMM: AUTO TUNING MEMORY MANAGEMENT ALGORITHMS
4.2 Memory Requirement Algorithm
The “Memory Management Algorithm” module of ATMM is designed to allocate mem-
ory space for RDDs and task executions. In particular, this algorithm receives the online memory
requirements (i.e., “numBytes”) from “Block Manager” and “Task Memory Manager” of Spark.
Then, once receiving the allocation decisions from “ Auto Tuning Algorithm”, the “Memory Man-
agement Algorithm” is triggered to actually assign the corresponding amount of memory to the
storage and execution pools.
Alg. 2 describes the main procedures in this algorithm. Procedures requireExecutionMem-
ory() and requireStorageMemory() requests the memory for tasks processing and RDD caching,
respectively. In this two procedures, we use the variable heapStorageMemory to limit the maxi-
mum memory that can be borrowed from the storage memory pool by the execution memory pool.
By this way, we can prevent storage memory from lending too much memory space to execution
memory and thus avoid a dramatic increase of the size of the old generation in JVM heap. As we
introduced in Section. 2.3, if the size of the old generation increases, then the duration of full GC
increases, which can cause a long latency of a Spark application. Thus, we use heapStorageMem-
ory, which is controlled by our “Auto Tuning Algorithm”, to balance the execution requirement and
the overwhelming GC time.
The Procedure requireExecutionMemory() takes “numBytes” as input which is the mem-
ory size required by “Task Memory Manager”, and returns the actual memory allocated for “Task
Memory Manager”. If required memory is less than the free execution memory, this procedure
allocates all needed memory to “Task Memory Manager”. Otherwise, this procedure borrows the
memory from the storage memory pool and the amount of the borrowed memory is set as the least
of the free storage memory, extra execution memory needed, and the free storage memory with de-
ducting of heapStorageMemory (see lines 4 in Alg. 2). After allocating memory for “Task Memory
Manager”, heapStorageMemory needs to be updated (see lines 7 and 8 in Alg. 2). Because the oc-
cupied storage memory is now more than heapStorageMemory, which violates the condition where
we update this variable in Alg. 1 (i.e., the storage memory is fully utilized).
Finally, Procedure requireStorageMemory() is called by “Block Manager”, which takes
“numBytes” (i.e., the required memory for storage) as input, and returns the actual memory allo-
cated for “Block Manager”. If the required memory is more than the free storage memory, then
RDD eviction happens and the storage memory is released for caching new RDDs using the LRU
algorithm.
26
Chapter 5
Evaluation
5.1 Spark Implementation
We evaluate ATMM by integrating our methodologies into Apache Spark 2.2.0. We de-
ploy a Spark cluster with 4 workers and one driver which are homogeneous to each other and
hypervised by VMware Workstation 12.5.0. Each node in the Spark cluster is assigned 1 core, 1GB
JVM memory, and 50GB disk space. Table 5.1 summarizes the details of our testbed configuration.
Table 5.1: Testbed configuration.
Component Specs
Host Server Dell PowerEdge T310
Host Processor Speed 2.93GHz
Host Memory Capacity 16GB DIMM DDR3
Host Memory Data Rate 1333 MHz
Host Storage Device Western Digital WD20EURS
Host Disk Bandwidth SATA 3.0Gbps
Host Hypervisor VMware Workstation 12.5.0
Big Data Platform Apache Spark 2.2.0
Processor Core Per Node 1 Core
Memory Size Per Node 1 GB
Disk Size Per Node 50 GB
We integrate our memory manager ATMM in Spark 2.2.0 and create a new class, called
“AutoTuningMemoryManger()”, to implement our algorithms. We also modify “Executor()” to
27
CHAPTER 5. EVALUATION
collect task-level performance measurements as the input of our algorithms to make the adjustment
of the boundary between two memory pools. Additionally, we modify “MemoryManager()” to
provide interfaces for other modules. Our major codes in Scala are shown in Fig. 5.1, Fig. 5.2,
Fig. 5.3, Fig. 5.4, and Fig. 5.5.
Figure 5.1: barChange() simple version
Figure 5.2: barChange() buffer version
28
CHAPTER 5. EVALUATION
Figure 5.3: setUp() & setDown()
Figure 5.4: acquireExcecutionMemory()
29
CHAPTER 5. EVALUATION
Figure 5.5: acquireStoragenMemory()
5.2 Performance Comparison
In this section, we evaluate and compare the performance of Spark applications with three
memory managers: SMM, UMM, and ATMM. As mentioned in Section 3.2, if 1GB memory is
assigned to JVM heap, then the available memory for UMM and SMM is 434.4MB and 716.8MB,
respectively. In our experiments, we set the available memory space as 434.4MB in ATMM, which
is the same as that of UMM for a fair comparison.
According to Section. 2.2, it’s noticeable that if we have N stages and M partitions in
a Spark application, then there will be in total N ∗M tasks. Then, the average number of tasks
assigned to each executor is N ∗ M/numberOfExecutors. As described in Section 4.1, our
algorithm is triggered to make the adjustment when a task is computed. Thus, in our experiments,
we set the number of partitions of each RDD as to 100. Averagely, each executor is in charge of 25
partitions, and ATMM runs to make 25 decisions per stage for memory adjustment on each executor.
5.2.1 Latency Comparison
We use SparkPageRank[19] as a benchmark and conduct a set of experiments with dif-
ferent input data sizes. We compare the latency of SparkPageRank managed by three memory
managers (i.e., SMM, UMM and ATMM). Here, we set the storage memory as 20% of available
memory under SMM, and the initial partition of storage and execution memory is 50% under UMM.
Fig. 5.6 shows the latency of SparkPageRank with 10 iterations and various input data
set sizes (i.e., 1GB, 2GB, and 3GB). For each input data size, we construct a set of experiments
to compare the latency of the Spark application with different memory managers. It illustrates that
when the size of the input data is 1GB, the three memory managers have similar performance, i.e.,
30
CHAPTER 5. EVALUATION
5.9
24
36
5
30
44
5.4
23
34
0
5
10
15
20
25
30
35
40
45
50
1GB 2GB 3GB
Latency(m
in)
Input data size
SMM UMM ATMM
Figure 5.6: Comparison of SMM, UMM and ATMM with different input data size
the Spark application needs 5 to 6 minutes to finish. This is because the input data is partitioned
into small pieces with a parallelism of 100, and thus the execution requirement of each task is
actually small. In this scenario, the pressure of GC for all three memory managers is limited and
the difference between the three execution time is negligible. But, we can still observe that the
latency of ATMM and UMM is a little lower than that of SMM because of their dynamic memory
adjustment.
Furthermore, we can observe a better performance offered by ATMM when the input
data size is set to 2GB or 3GB. ATMM achieves a lower latency than that of UMM, i.e., with
23.3% and 22.7% improvement for 2GB and 3GB, respectively. We interpret this improvement by
observing that ATMM leverages the GC time to adaptively adjust the boundary between storage and
execution memory, which prevents the Spark applications from a long GC duration, as UMM does.
We observe that SMM also has a better performance than that of UMM. The reason behind this
is we set the storage memory as 20% of the total available memory, which fits the characteristics
of SparkPageRank perfectly. However, as discussed in Section 3.2, SMM only provides a static
memory allocation for the storage and execution memory pool, which cannot always guarantee the
best performance for every Spark application.
5.2.2 Memory Usage and Garbage Collection Analysis
We further look closely at the execution details of three Spark memory managers by plot-
ting their memory usages in Fig. 5.7, where SparkPageRank is running with 10 iterations and 3GB
31
CHAPTER 5. EVALUATION
a) SMM StorageMemoryUsage
0
50
100
150
200
250
019
6521
0821
2421
3121
4421
4921
5121
5521
5921
6221
6521
7121
7621
8021
8421
8721
9021
9321
95Stor
age
Mem
ory
Usa
ge(M
B)
Time (s)
Used Memory Memory Pool Size
d) SMMExecutionMemoryUsage
0
100
200
300
400
500
600
010
427
646
472
389
310
9813
9415
8119
0221
0521
2121
3221
4621
5121
5721
6121
6821
7521
8021
8521
90
Exec
utio
nM
emor
yU
sage
(MB)
Time (s)
Used Memory Memory Pool size
b) UMMStorageMemoryUsage
0
50
100
150
200
250
300
024
0824
1424
1824
4124
4524
4924
5124
5524
5724
6124
6324
8224
8424
8824
9024
9424
9625
0125
0325
0725
10
Stor
age
Mem
eoy
Usa
ge (M
B)
Time (s)
Used Memory Memory Pool Size
e) UMMExecutionMemoryUsage
050
100150200250300350400450
010
834
552
376
597
012
5114
9316
5018
2021
7024
0424
2624
3924
4524
5124
5624
7624
8024
8724
9124
9725
03Exec
utio
nM
emor
yU
sage
(MB)
Time (s)
Used Memory Memory Pool size
c) ATMMStorageMemoryUsage
050
100150200250300350400450
013
236
761
788
411
7214
1714
8115
0115
2715
4715
7315
9016
1816
5116
7717
2217
5217
9118
3118
43
Stor
age
Mem
ory
Usa
ge (M
B)
Time (s)
Used Memory Memory Pool Size
f) ATMMExecutionMemoryUsage
050
100150200250300350
400
2627
2678
2735
2844
2964
3083
3197
3343
3455 53 18
135
166
167
970
573
276
678
079
981
383
985
288
790
0Exec
utio
nM
emor
yU
sage
(MB)
Time (s)
Used Memory Memory Pool size
Figure 5.7: Memory Usage Analysis
32
CHAPTER 5. EVALUATION
input data. Plots (a)∼(c) in Fig. 5.7 present the storage usage across time under the three memory
managers, while figure (d)∼(f) illustrate the corresponding storage memory usage. In each plot, the
dashed line is the maximum memory size accessible for storage or execution, and the solid line is
the actual usage of the memory pool.
From Fig. 5.7-(a)∼(c), we observe that storage memory usage is similar for SMM and
UMM because the RDDs cached are the same for these two experiments. Additionally, both UMM
and ATMM dynamically change the storage memory pool sizes instead of the fixed one as SMM
does. More importantly, UMM drops the size of its storage pool to almost zero and then grows its
storage pool as RDDs are cached. In Fig. 5.7-(c), we observe the shrink of the storage memory pool
under ATMM, which actually triggers RDD eviction. Although eviction may happen and result in
RDD recomputation, the overall latency of Spark applications still reduced under our ATMM.
We further observe that execution memory usage is different for each memory manager,
from Fig. 5.7-(d)∼(f). ATMM has a more dynamic execution memory usage than UMM. We can see
that the execution memory pool size increases gradually across time until it satisfies the execution-
consuming needs. In opposite, UMM increases its execution memory pool size dramatically, which
actually expands the old generation of JVM heap rapidly, and thus causes a longer full GC time
according to the analysis in Section 3.3. In addition, we notice that there are two increasing exe-
cution memory usages under ATMM. The first one happens for allocating more execution memory
to process tasks, while the second pool size increment is incurred by the adjustment delay. That
is, we apply reinforcement learning to adjust the memory partition based on the performance of
previous tasks. The learning process has a delay in nature, which is unavoidable. However, such an
adjustment the delay is harmless to Spark performance. which will consider to minimize it in our
future work.
To further verify the performance of our ATMM, we present GC time of SparkPageRank
with three different memory managers in Fig. 5.8. First, we observe that there exist spikes of GC
time under all three memory managers. We notice that each spike represents an occurrence of a full
GC. Fig. 5.8-(a) shows that the full GC time of SMM is under 40 seconds, while the full GC time
of UMM is up to 70 seconds in Fig. 5.8-(c). More importantly, we can observe that the full GC
times of our ATMM are all shorter than 30 seconds in Fig. 5.8-(b). We also observe fewer spikes
occurred under ATMM than under UMM and SMM, Therefore, we can conclude that our ATMM
has a shorter total GC time than both SMM and UMM, where the total GC time of SMM, UMM,
and ATMM is 14min, 20min and 7.9min, respectively. We further show GC time, execution time
and their ratio of under three memory managers in Table 5.2. The results shown in the table further
33
CHAPTER 5. EVALUATION
a) SMMGC
051015202530354045
5.3386.758
13.888
19.225
104.867
219.746
381.5
506.88
658.56
804.293
992.878
1149.284
1303.926
1462.863
1622.735
1735.866
FullGCTIME(S)
Time (s)
01020304050607080
53.691
57.739
68.178
74.959
182.484
345.038
515.155
675.392
866.938
1061.055
1292.138
1488.558
1659.072
1812.99
2012.05
2264.809
FullGCTime(s)
Time (s)
b) UMMGC
0
5
10
15
20
25
30
199.474
264.982
388.001
496.793
627.281
742.491
876.061
1016.898
1179.292
1297.356
1439.174
1811.94
FullGCTime(s)
Time (s)
c) ATMMGC
Figure 5.8: GC Analysis
34
CHAPTER 5. EVALUATION
verify that our ATMM is able to improve the performance by significantly reducing both GC and
execution time.
Table 5.2: GC time and Execution time analysis
Memory Manager GC time (min) execution time (min) ratio
ATMM 7.9 34 23.2%
SMM 14 36 38.9%
UMM 20 44 45.5%
35
Chapter 6
Conclusion And Future Work
In this dissertation, we study the architecture of the two existing memory managers in
Apache Spark and investigate the pros and cons of both of them. Under SMM, we conduct experi-
ments to analyze how different partitions of two memory pools (i.e., storage and execution memory
pool) can affect the performance of Spark applications. In addition, we compare the performance
of SMM with UMM and observe that UMM cannot always provide the optimal performance, even
though it can dynamically allocate memory based on actual memory demands. We further inves-
tigate the JVM GC mechanism have the observation that UMM actually introduces long GC time,
which results in considerably high latency.
We thus design a new Spark memory manager, by leveraging GC time to adjust the bound-
ary between the storage and the execution memory pools. ATMM allocates memory to the execution
memory pool under the limitation of a newly introduced variable which is determined by GC time
of each task. We implement ATMM in Spark 2.2.0 and construct experiments in a real Spark cluster
to compare the performance of ATMM with those of two existing memory managers. We find that
our ATMM obtains more than 20% improvement of Spark performance, compared with UMM.
In the future, we plan to further refine our ATMM. For example, Auto Tuning Algorithm
now adjusts the boundary between two memory pools by a fixed memory size (i.e., 5% of available
memory). We will dynamically change this adjustment size based on the performance difference
between the current task and the previous tasks. When we make a decision, an alternative to the
current algorithm is to adjust the memory pool size by leveraging the difference between the ratios
of the current task and the previous tasks. This optimization can further improve the accuracy of
ATMM.
Additionally, we need to implement Spark on a large-scale system with more than 1 core
36
CHAPTER 6. CONCLUSION AND FUTURE WORK
and 1GB memory to evaluate the scalability of Spark with ATMM. We also plan to increase the
input data size to TeraBytes to observe ATMM’s performance with big data. Finally, we plan to
increase the number of executors in a Spark cluster, with the heterogeneous configuration as well.
We are interested in the performance of ATMM on other benchmarks, and will conduct a sensitivity
analysis of workload contributes to the performance of ATMM.
37
Bibliography
[1] M. Armbrust, A. Fox, R. Griffith, A. D. Joseph, R. Katz, A. Konwinski, G. Lee,
D. Patterson, A. Rabkin, I. Stoica, and M. Zaharia, “A view of cloud computing,”
Commun. ACM, vol. 53, no. 4, pp. 50–58, Apr. 2010. [Online]. Available: http:
//doi.acm.org/10.1145/1721654.1721672
[2] T. White, Hadoop: The Definitive Guide. Yahoo Press, May 2012.
[3] D. Cheng, J. Rao, Y. Guo, and X. Zhou, “Improving mapreduce performance in heterogeneous
environments with adaptive task tuning,” in Proceedings of the 15th International Middleware
Conference, ser. Middleware ’14. New York, NY, USA: ACM, 2014, pp. 97–108. [Online].
Available: http://doi.acm.org/10.1145/2663165.2666089
[4] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: Cluster
computing with working sets,” in Proceedings of the 2Nd USENIX Conference on Hot Topics
in Cloud Computing, ser. HotCloud’10. Berkeley, CA, USA: USENIX Association, 2010,
pp. 10–10. [Online]. Available: http://dl.acm.org/citation.cfm?id=1863103.1863113
[5] M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker,
and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory
cluster computing,” in Proceedings of the 9th USENIX Conference on Networked Systems
Design and Implementation, ser. NSDI’12. Berkeley, CA, USA: USENIX Association,
2012, pp. 2–2. [Online]. Available: http://dl.acm.org/citation.cfm?id=2228298.2228301
[6] S. Han, W. Choi, R. Muwafiq, and Y. Nah, “Impact of memory size on bigdata processing
based on hadoop and spark,” in Proceedings of the International Conference on Research in
Adaptive and Convergent Systems, ser. RACS ’17. New York, NY, USA: ACM, 2017, pp.
275–280. [Online]. Available: http://doi.acm.org/10.1145/3129676.3129688
38
BIBLIOGRAPHY
[7] A. Grishchenko, “Spark memory management,” https://0x0fff.com/
spark-memory-management/.
[8] D. Wang and J. Huang, “Project tungsten: Bringing apache spark closer to bare metal,” https://
databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html.
[9] T. B. G. Perez, X. Zhou, and D. Cheng, “Reference-distance eviction and prefetching for
cache management in spark,” in Proceedings of the 47th International Conference on Parallel
Processing, ser. ICPP 2018. New York, NY, USA: ACM, 2018, pp. 88:1–88:10. [Online].
Available: http://doi.acm.org/10.1145/3225058.3225087
[10] L.-Y. Ho, J.-J. Wu, P. Liu, C.-C. Shih, C.-C. Huang, and C.-W. Huang, “Efficient
cache update for in-memory cluster computing with spark,” in Proceedings of the
17th IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing, ser.
CCGrid ’17. Piscataway, NJ, USA: IEEE Press, 2017, pp. 21–30. [Online]. Available:
https://doi.org/10.1109/CCGRID.2017.21
[11] V. S. Marco, B. Taylor, B. Porter, and Z. Wang, “Improving spark application throughput via
memory aware task co-location: A mixture of experts approach,” in Proceedings of the 18th
ACM/IFIP/USENIX Middleware Conference, ser. Middleware ’17. New York, NY, USA:
ACM, 2017, pp. 95–108. [Online]. Available: http://doi.acm.org/10.1145/3135974.3135984
[12] F. Chen, S. Wu, H. Jin, Y. Yao, Z. Liu, L. Gu, and Y. Zhou, “Lever: Towards low-latency
batched stream processing by pre-scheduling,” in Proceedings of the 2017 Symposium on
Cloud Computing, ser. SoCC ’17. New York, NY, USA: ACM, 2017, pp. 643–643. [Online].
Available: http://doi.acm.org/10.1145/3127479.3132687
[13] L. Xu, M. Li, L. Zhang, A. R. Butt, Y. Wang, and Z. Z. Hu, “Memtune: Dynamic memory
management for in-memory data analytic platforms,” in 2016 IEEE International Parallel and
Distributed Processing Symposium (IPDPS), May 2016, pp. 383–392.
[14] M. Kim, J. Li, H. Volos, M. Marwah, A. Ulanov, K. Keeton, J. Tucek, L. Cherkasova,
L. Xu, and P. Fernando, “Sparkle: Optimizing spark for large memory machines
and analytics,” in Proceedings of the 2017 Symposium on Cloud Computing, ser.
SoCC ’17. New York, NY, USA: ACM, 2017, pp. 656–656. [Online]. Available:
http://doi.acm.org/10.1145/3127479.3134762
39
BIBLIOGRAPHY
[15] S. Sahin, W. Cao, Q. Zhang, and L. Liu, “Jvm configuration management and its performance
impact for big data applications,” 2016 IEEE International Congress on Big Data (BigData
Congress), pp. 410–417, 2016.
[16] T. Yang, E. D. Berger, S. F. Kaplan, and J. E. B. Moss, “Cramm: Virtual memory support for
garbage-collected applications,” in Proceedings of the 7th Symposium on Operating Systems
Design and Implementation, ser. OSDI ’06. Berkeley, CA, USA: USENIX Association,
2006, pp. 103–116. [Online]. Available: http://dl.acm.org/citation.cfm?id=1298455.1298466
[17] C. Reiss and R. H. Katz, “Recommending just enough memory for analytics,”
in Proceedings of the 4th Annual Symposium on Cloud Computing, ser. SOCC
’13. New York, NY, USA: ACM, 2013, pp. 49:1–49:2. [Online]. Available: http:
//doi.acm.org/10.1145/2523616.2525957
[18] G. Liao, K. Datta, and T. L. Willke, “Gunther: Search-based auto-tuning of mapreduce,” in
Euro-Par 2013 Parallel Processing, F. Wolf, B. Mohr, and D. an Mey, Eds. Berlin, Heidel-
berg: Springer Berlin Heidelberg, 2013, pp. 406–419.
[19] M. J. F. I. S. Reynold S. Xin, Joseph E. Gonzalez, “Graphx: A resilient distributed graph
system on spark,” AMPLab, EECS, UC Berkeley, Jun 23 2013.
40