spark and resilient distributed datasetscolazzo/bd/spark.slides.3.pdf · scala’s closure objects...

23
Spark and Resilient Distributed Datasets

Upload: others

Post on 24-Sep-2020

6 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Spark and Resilient Distributed Datasets

Page 2: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Motivation

MapReduce greatly simplified big data analysis on large, unreliable clusters.

But as soon as it got popular, users wanted more:

Iterative jobs, e.g., machine learning algorithms

Interactive analytics

Page 3: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

MotivationBoth iterative and interactive queries need one thing that MapReduce lacks

In MapReduce, the only way to share data across jobs is stable storage (disk)

Replication also makes the system slow, but it is necessary for fault tolerance.

Efficient primitives for data sharing.

Page 4: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

SolutionExample

Amir H. Payberah (SICS) Spark April 24, 2014 5 / 49

In memory data processing and sharing

Page 5: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

SharingExample

Amir H. Payberah (SICS) Spark April 24, 2014 6 / 49

Page 6: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Challenge How to design a distributed memory abstraction that is both fault tolerant and efficient ?

Solution: Resilient Distributed Datasets (RDD)

A distributed memory abstraction.

Immutable collections of objects spread across a cluster.

Lineage among RDDs to enable their re-evaluation in case of cluster node failures

Page 7: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Resilient Distributed Datasets (RDDs)

An RDD is divided into a number of partitions, which are atomic pieces of information.

Partitions of an RDD can be stored on different nodes of a cluster.

Resilient Distributed Datasets (RDD) (2/2)

I An RDD is divided into a number of partitions, which are atomicpieces of information.

I Partitions of an RDD can be stored on di↵erent nodes of a cluster.

Amir H. Payberah (SICS) Spark April 24, 2014 9 / 49Worker

tasks

results RAM

Input Data

Worker RAM

Input Data

Worker RAM

Input Data

Driver

Figure 2: Spark runtime. The user’s driver program launchesmultiple workers, which read data blocks from a distributed filesystem and can persist computed RDD partitions in memory.

ule tasks based on data locality to improve performance.Second, RDDs degrade gracefully when there is notenough memory to store them, as long as they are onlybeing used in scan-based operations. Partitions that donot fit in RAM can be stored on disk and will providesimilar performance to current data-parallel systems.

2.4 Applications Not Suitable for RDDs

As discussed in the Introduction, RDDs are best suitedfor batch applications that apply the same operation toall elements of a dataset. In these cases, RDDs can ef-ficiently remember each transformation as one step in alineage graph and can recover lost partitions without hav-ing to log large amounts of data. RDDs would be lesssuitable for applications that make asynchronous fine-grained updates to shared state, such as a storage sys-tem for a web application or an incremental web crawler.For these applications, it is more efficient to use systemsthat perform traditional update logging and data check-pointing, such as databases, RAMCloud [25], Percolator[26] and Piccolo [27]. Our goal is to provide an efficientprogramming model for batch analytics and leave theseasynchronous applications to specialized systems.

3 Spark Programming InterfaceSpark provides the RDD abstraction through a language-integrated API similar to DryadLINQ [31] in Scala [2],a statically typed functional programming language forthe Java VM. We chose Scala due to its combination ofconciseness (which is convenient for interactive use) andefficiency (due to static typing). However, nothing aboutthe RDD abstraction requires a functional language.

To use Spark, developers write a driver program thatconnects to a cluster of workers, as shown in Figure 2.The driver defines one or more RDDs and invokes ac-tions on them. Spark code on the driver also tracks theRDDs’ lineage. The workers are long-lived processesthat can store RDD partitions in RAM across operations.

As we showed in the log mining example in Sec-tion 2.2.1, users provide arguments to RDD opera-

tions like map by passing closures (function literals).Scala represents each closure as a Java object, andthese objects can be serialized and loaded on anothernode to pass the closure across the network. Scala alsosaves any variables bound in the closure as fields inthe Java object. For example, one can write code likevar x = 5; rdd.map(_ + x) to add 5 to each elementof an RDD.5

RDDs themselves are statically typed objectsparametrized by an element type. For example,RDD[Int] is an RDD of integers. However, most of ourexamples omit types since Scala supports type inference.

Although our method of exposing RDDs in Scala isconceptually simple, we had to work around issues withScala’s closure objects using reflection [33]. We alsoneeded more work to make Spark usable from the Scalainterpreter, as we shall discuss in Section 5.2. Nonethe-less, we did not have to modify the Scala compiler.

3.1 RDD Operations in Spark

Table 2 lists the main RDD transformations and actionsavailable in Spark. We give the signature of each oper-ation, showing type parameters in square brackets. Re-call that transformations are lazy operations that define anew RDD, while actions launch a computation to returna value to the program or write data to external storage.

Note that some operations, such as join, are only avail-able on RDDs of key-value pairs. Also, our functionnames are chosen to match other APIs in Scala and otherfunctional languages; for example, map is a one-to-onemapping, while flatMap maps each input value to one ormore outputs (similar to the map in MapReduce).

In addition to these operators, users can ask for anRDD to persist. Furthermore, users can get an RDD’spartition order, which is represented by a Partitionerclass, and partition another dataset according to it. Op-erations such as groupByKey, reduceByKey and sort au-tomatically result in a hash or range partitioned RDD.

3.2 Example Applications

We complement the data mining example in Section2.2.1 with two iterative applications: logistic regressionand PageRank. The latter also showcases how control ofRDDs’ partitioning can improve performance.

3.2.1 Logistic Regression

Many machine learning algorithms are iterative in naturebecause they run iterative optimization procedures, suchas gradient descent, to maximize a function. They canthus run much faster by keeping their data in memory.

As an example, the following program implements lo-gistic regression [14], a common classification algorithm

5We save each closure at the time it is created, so that the map inthis example will always add 5 even if x changes.

Page 8: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Programming model: SparkBased on parallelizable operators.

Parallelizable operators are higher-order functions that execute user-defined functions in parallel.

Higher-order functions are RDDs operators.

There are two types of RDD operators : transformations and actions.

Page 9: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Transformations : lazy operators that create new RDDs.

Actions : lunch a computation and return a value to the program or write data to the external storage

The main programming language is Scala:

a strongly and statically typed functional-OO language

compiled over the JVM

designed at EPFL (Switzerland).

Programming model: Spark

Page 10: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Example (1/2)Suppose that a web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop filesystem (HDFS) to find the cause.

Actions can be used to count errors:

Or counting errors mentioning MySQL:

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

Page 11: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Example (1/2)Suppose that a web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop filesystem (HDFS) to find the cause.

Actions can be used to count errors:

Or counting errors mentioning MySQL:

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

Page 12: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Example (1/2)Suppose that a web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop filesystem (HDFS) to find the cause.

Actions can be used to count errors:

Or counting errors mentioning MySQL:

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines is not loaded in memory (simple static analysis) only errors is

lazy evaluation: errors is actually calculated and put in memory when the count() action is evaluated

Page 13: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Fault tolerance via lineage

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

lines

errors

filter(_.startsWith(“ERROR”))

HDFS errors

time fields

filter(_.contains(“HDFS”)))

map(_.split(‘\t’)(3))

Figure 1: Lineage graph for the third query in our example.Boxes represent RDDs and arrows represent transformations.

lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.persist()

Line 1 defines an RDD backed by an HDFS file (as acollection of lines of text), while line 2 derives a filteredRDD from it. Line 3 then asks for errors to persist inmemory so that it can be shared across queries. Note thatthe argument to filter is Scala syntax for a closure.

At this point, no work has been performed on the clus-ter. However, the user can now use the RDD in actions,e.g., to count the number of messages:

errors.count()

The user can also perform further transformations onthe RDD and use their results, as in the following lines:

// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))

.map(_.split(’\t’)(3))

.collect()

After the first action involving errors runs, Spark willstore the partitions of errors in memory, greatly speed-ing up subsequent computations on it. Note that the baseRDD, lines, is not loaded into RAM. This is desirablebecause the error messages might only be a small frac-tion of the data (small enough to fit into memory).

Finally, to illustrate how our model achieves fault tol-erance, we show the lineage graph for the RDDs in ourthird query in Figure 1. In this query, we started witherrors, the result of a filter on lines, and applied a fur-ther filter and map before running a collect. The Sparkscheduler will pipeline the latter two transformations andsend a set of tasks to compute them to the nodes holdingthe cached partitions of errors. In addition, if a partitionof errors is lost, Spark rebuilds it by applying a filter ononly the corresponding partition of lines.

Aspect RDDs Distr. Shared Mem. Reads Coarse- or fine-grained Fine-grained Writes Coarse-grained Fine-grained Consistency Trivial (immutable) Up to app / runtime Fault recovery Fine-grained and low-

overhead using lineage Requires checkpoints and program rollback

Straggler mitigation

Possible using backup tasks

Difficult

Work placement

Automatic based on data locality

Up to app (runtimes aim for transparency)

Behavior if not enough RAM

Similar to existing data flow systems

Poor performance (swapping?)

Table 1: Comparison of RDDs with distributed shared memory.

2.3 Advantages of the RDD Model

To understand the benefits of RDDs as a distributedmemory abstraction, we compare them against dis-tributed shared memory (DSM) in Table 1. In DSM sys-tems, applications read and write to arbitrary locations ina global address space. Note that under this definition, weinclude not only traditional shared memory systems [24],but also other systems where applications make fine-grained writes to shared state, including Piccolo [27],which provides a shared DHT, and distributed databases.DSM is a very general abstraction, but this generalitymakes it harder to implement in an efficient and fault-tolerant manner on commodity clusters.

The main difference between RDDs and DSM is thatRDDs can only be created (“written”) through coarse-grained transformations, while DSM allows reads andwrites to each memory location.3 This restricts RDDsto applications that perform bulk writes, but allows formore efficient fault tolerance. In particular, RDDs do notneed to incur the overhead of checkpointing, as they canbe recovered using lineage.4 Furthermore, only the lostpartitions of an RDD need to be recomputed upon fail-ure, and they can be recomputed in parallel on differentnodes, without having to roll back the whole program.

A second benefit of RDDs is that their immutable na-ture lets a system mitigate slow nodes (stragglers) by run-ning backup copies of slow tasks as in MapReduce [10].Backup tasks would be hard to implement with DSM, asthe two copies of a task would access the same memorylocations and interfere with each other’s updates.

Finally, RDDs provide two other benefits over DSM.First, in bulk operations on RDDs, a runtime can sched-

3Note that reads on RDDs can still be fine-grained. For example, anapplication can treat an RDD as a large read-only lookup table.

4In some applications, it can still help to checkpoint RDDs withlong lineage chains, as we discuss in Section 5.4. However, this can bedone in the background because RDDs are immutable, and there is noneed to take a snapshot of the whole application as in DSM.

the lineage graph enables RDD re-evaluation in case of failure

Page 14: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

RDD transformations and actions

Transformations

map( f : T ) U) : RDD[T] ) RDD[U]filter( f : T ) Bool) : RDD[T] ) RDD[T]

flatMap( f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)

groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey( f : (V,V)) V) : RDD[(K, V)] ) RDD[(K, V)]

union() : (RDD[T],RDD[T])) RDD[T]join() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (V, W))]

cogroup() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T],RDD[U])) RDD[(T, U)]

mapValues( f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]

partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

Actions

count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]

reduce( f : (T,T)) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)

save(path : String) : Outputs RDD to a storage system, e.g., HDFS

Table 2: Transformations and actions available on RDDs in Spark. Seq[T] denotes a sequence of elements of type T.

that searches for a hyperplane w that best separates twosets of points (e.g., spam and non-spam emails). The al-gorithm uses gradient descent: it starts w at a randomvalue, and on each iteration, it sums a function of w overthe data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()

var w = // random initial vectorfor (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

}.reduce((a,b) => a+b)w -= gradient

}

We start by defining a persistent RDD called pointsas the result of a map transformation on a text file thatparses each line of text into a Point object. We then re-peatedly run map and reduce on points to compute thegradient at each step by summing a function of the cur-rent w. Keeping points in memory across iterations canyield a 20⇥ speedup, as we show in Section 6.1.

3.2.2 PageRankA more complex pattern of data sharing occurs inPageRank [6]. The algorithm iteratively updates a rankfor each document by adding up contributions from doc-uments that link to it. On each iteration, each documentsends a contribution of r

n to its neighbors, where r is itsrank and n is its number of neighbors. It then updatesits rank to a/N + (1 � a)Âci, where the sum is overthe contributions it received and N is the total number ofdocuments. We can write PageRank in Spark as follows:

// Load graph as an RDD of (URL, outlinks) pairs

ranks0 input file map

contribs0

ranks1

contribs1

ranks2

contribs2

links join

reduce + map

. . .

Figure 3: Lineage graph for datasets in PageRank.

val links = spark.textFile(...).map(...).persist()var ranks = // RDD of (URL, rank) pairsfor (i <- 1 to ITERATIONS) {// Build an RDD of (targetURL, float) pairs// with the contributions sent by each pageval contribs = links.join(ranks).flatMap {(url, (links, rank)) =>links.map(dest => (dest, rank/links.size))

}// Sum contributions by URL and get new ranksranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum)}

This program leads to the RDD lineage graph in Fig-ure 3. On each iteration, we create a new ranks datasetbased on the contribs and ranks from the previous iter-ation and the static links dataset.6 One interesting fea-ture of this graph is that it grows longer with the number

6Note that although RDDs are immutable, the variables ranks andcontribs in the program point to different RDDs on each iteration.

Page 15: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

RDD transformations : Map

• All pairs are independently processed RDD Transformations - Map

I All pairs are independently processed.

// passing each element through a function.

val nums = sc.parallelize(Array(1, 2, 3))

val squares = nums.map(x => x * x) // {1, 4, 9}

// selecting those elements that func returns true.

val even = squares.filter(x => x % 2 == 0) // {4}

// mapping each element to zero or more others.

nums.flatMap(x => Range(0, x, 1)) // {0, 0, 1, 0, 1, 2}

Amir H. Payberah (SICS) Spark April 24, 2014 16 / 49

RDD Transformations - Map

I All pairs are independently processed.

// passing each element through a function.

val nums = sc.parallelize(Array(1, 2, 3))

val squares = nums.map(x => x * x) // {1, 4, 9}

// selecting those elements that func returns true.

val even = squares.filter(x => x % 2 == 0) // {4}

// mapping each element to zero or more others.

nums.flatMap(x => Range(0, x, 1)) // {0, 0, 1, 0, 1, 2}

Amir H. Payberah (SICS) Spark April 24, 2014 16 / 49

Page 16: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

RDD transformations : Reduce

• Pairs with identical key are grouped

• Each group is independently processed

RDD Transformations - Reduce

I Pairs with identical key are grouped.

I Groups are independently processed.

val pets = sc.parallelize(Seq(("cat", 1), ("dog", 1), ("cat", 2)))

pets.reduceByKey((x, y) => x + y)

// {(cat, 3), (dog, 1)}

pets.groupByKey()

// {(cat, (1, 2)), (dog, (1))}

Amir H. Payberah (SICS) Spark April 24, 2014 17 / 49

RDD Transformations - Reduce

I Pairs with identical key are grouped.

I Groups are independently processed.

val pets = sc.parallelize(Seq(("cat", 1), ("dog", 1), ("cat", 2)))

pets.reduceByKey((x, y) => x + y)

// {(cat, 3), (dog, 1)}

pets.groupByKey()

// {(cat, (1, 2)), (dog, (1))}

Amir H. Payberah (SICS) Spark April 24, 2014 17 / 49

Page 17: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

RDD transformations : Join• Equi-join on the key

• Join candidates are independently processed RDD Transformations - Join

I Performs an equi-join on the key.

I Join candidates are independently pro-cessed.

val visits = sc.parallelize(Seq(("index.html", "1.2.3.4"),

("about.html", "3.4.5.6"),

("index.html", "1.3.3.1")))

val pageNames = sc.parallelize(Seq(("index.html", "Home"),

("about.html", "About")))

visits.join(pageNames)

// ("index.html", ("1.2.3.4", "Home"))

// ("index.html", ("1.3.3.1", "Home"))

// ("about.html", ("3.4.5.6", "About"))

Amir H. Payberah (SICS) Spark April 24, 2014 18 / 49

RDD Transformations - Join

I Performs an equi-join on the key.

I Join candidates are independently pro-cessed.

val visits = sc.parallelize(Seq(("index.html", "1.2.3.4"),

("about.html", "3.4.5.6"),

("index.html", "1.3.3.1")))

val pageNames = sc.parallelize(Seq(("index.html", "Home"),

("about.html", "About")))

visits.join(pageNames)

// ("index.html", ("1.2.3.4", "Home"))

// ("index.html", ("1.3.3.1", "Home"))

// ("about.html", ("3.4.5.6", "About"))

Amir H. Payberah (SICS) Spark April 24, 2014 18 / 49

Page 18: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

RDD transformations : CoGroup

• Groups each input on key

• Groups with identical keys are processed together RDD Transformations - CoGroup

I Groups each input on key.

I Groups with identical keys are processedtogether.

val visits = sc.parallelize(Seq(("index.html", "1.2.3.4"),

("about.html", "3.4.5.6"),

("index.html", "1.3.3.1")))

val pageNames = sc.parallelize(Seq(("index.html", "Home"),

("about.html", "About")))

visits.cogroup(pageNames)

// ("index.html", (("1.2.3.4", "1.3.3.1"), ("Home")))

// ("about.html", (("3.4.5.6"), ("About")))

Amir H. Payberah (SICS) Spark April 24, 2014 19 / 49

RDD Transformations - CoGroup

I Groups each input on key.

I Groups with identical keys are processedtogether.

val visits = sc.parallelize(Seq(("index.html", "1.2.3.4"),

("about.html", "3.4.5.6"),

("index.html", "1.3.3.1")))

val pageNames = sc.parallelize(Seq(("index.html", "Home"),

("about.html", "About")))

visits.cogroup(pageNames)

// ("index.html", (("1.2.3.4", "1.3.3.1"), ("Home")))

// ("about.html", (("3.4.5.6"), ("About")))

Amir H. Payberah (SICS) Spark April 24, 2014 19 / 49

Page 19: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

PageRank• The algorithm works on linked N documents (e.g. Web),

and iteratively updates a rank for each document by adding up contributions from documents that link to it.

• On each iteration, each document sends a contribution c = r/n where r is the current rank of the document and n is the number of neighbours

• The rank of each document is then updated according to

Transformations

map( f : T ) U) : RDD[T] ) RDD[U]filter( f : T ) Bool) : RDD[T] ) RDD[T]

flatMap( f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)

groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey( f : (V,V)) V) : RDD[(K, V)] ) RDD[(K, V)]

union() : (RDD[T],RDD[T])) RDD[T]join() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (V, W))]

cogroup() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T],RDD[U])) RDD[(T, U)]

mapValues( f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]

partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

Actions

count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]

reduce( f : (T,T)) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)

save(path : String) : Outputs RDD to a storage system, e.g., HDFS

Table 2: Transformations and actions available on RDDs in Spark. Seq[T] denotes a sequence of elements of type T.

that searches for a hyperplane w that best separates twosets of points (e.g., spam and non-spam emails). The al-gorithm uses gradient descent: it starts w at a randomvalue, and on each iteration, it sums a function of w overthe data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()

var w = // random initial vectorfor (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

}.reduce((a,b) => a+b)w -= gradient

}

We start by defining a persistent RDD called pointsas the result of a map transformation on a text file thatparses each line of text into a Point object. We then re-peatedly run map and reduce on points to compute thegradient at each step by summing a function of the cur-rent w. Keeping points in memory across iterations canyield a 20⇥ speedup, as we show in Section 6.1.

3.2.2 PageRankA more complex pattern of data sharing occurs inPageRank [6]. The algorithm iteratively updates a rankfor each document by adding up contributions from doc-uments that link to it. On each iteration, each documentsends a contribution of r

n to its neighbors, where r is itsrank and n is its number of neighbors. It then updatesits rank to a/N + (1 � a)Âci, where the sum is overthe contributions it received and N is the total number ofdocuments. We can write PageRank in Spark as follows:

// Load graph as an RDD of (URL, outlinks) pairs

ranks0 input file map

contribs0

ranks1

contribs1

ranks2

contribs2

links join

reduce + map

. . .

Figure 3: Lineage graph for datasets in PageRank.

val links = spark.textFile(...).map(...).persist()var ranks = // RDD of (URL, rank) pairsfor (i <- 1 to ITERATIONS) {// Build an RDD of (targetURL, float) pairs// with the contributions sent by each pageval contribs = links.join(ranks).flatMap {(url, (links, rank)) =>links.map(dest => (dest, rank/links.size))

}// Sum contributions by URL and get new ranksranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum)}

This program leads to the RDD lineage graph in Fig-ure 3. On each iteration, we create a new ranks datasetbased on the contribs and ranks from the previous iter-ation and the static links dataset.6 One interesting fea-ture of this graph is that it grows longer with the number

6Note that although RDDs are immutable, the variables ranks andcontribs in the program point to different RDDs on each iteration.

Page 20: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

PageRank

Transformations

map( f : T ) U) : RDD[T] ) RDD[U]filter( f : T ) Bool) : RDD[T] ) RDD[T]

flatMap( f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)

groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey( f : (V,V)) V) : RDD[(K, V)] ) RDD[(K, V)]

union() : (RDD[T],RDD[T])) RDD[T]join() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (V, W))]

cogroup() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T],RDD[U])) RDD[(T, U)]

mapValues( f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]

partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

Actions

count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]

reduce( f : (T,T)) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)

save(path : String) : Outputs RDD to a storage system, e.g., HDFS

Table 2: Transformations and actions available on RDDs in Spark. Seq[T] denotes a sequence of elements of type T.

that searches for a hyperplane w that best separates twosets of points (e.g., spam and non-spam emails). The al-gorithm uses gradient descent: it starts w at a randomvalue, and on each iteration, it sums a function of w overthe data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()

var w = // random initial vectorfor (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

}.reduce((a,b) => a+b)w -= gradient

}

We start by defining a persistent RDD called pointsas the result of a map transformation on a text file thatparses each line of text into a Point object. We then re-peatedly run map and reduce on points to compute thegradient at each step by summing a function of the cur-rent w. Keeping points in memory across iterations canyield a 20⇥ speedup, as we show in Section 6.1.

3.2.2 PageRankA more complex pattern of data sharing occurs inPageRank [6]. The algorithm iteratively updates a rankfor each document by adding up contributions from doc-uments that link to it. On each iteration, each documentsends a contribution of r

n to its neighbors, where r is itsrank and n is its number of neighbors. It then updatesits rank to a/N + (1 � a)Âci, where the sum is overthe contributions it received and N is the total number ofdocuments. We can write PageRank in Spark as follows:

// Load graph as an RDD of (URL, outlinks) pairs

ranks0 input file map

contribs0

ranks1

contribs1

ranks2

contribs2

links join

reduce + map

. . .

Figure 3: Lineage graph for datasets in PageRank.

val links = spark.textFile(...).map(...).persist()var ranks = // RDD of (URL, rank) pairsfor (i <- 1 to ITERATIONS) {// Build an RDD of (targetURL, float) pairs// with the contributions sent by each pageval contribs = links.join(ranks).flatMap {(url, (links, rank)) =>links.map(dest => (dest, rank/links.size))

}// Sum contributions by URL and get new ranksranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum)}

This program leads to the RDD lineage graph in Fig-ure 3. On each iteration, we create a new ranks datasetbased on the contribs and ranks from the previous iter-ation and the static links dataset.6 One interesting fea-ture of this graph is that it grows longer with the number

6Note that although RDDs are immutable, the variables ranks andcontribs in the program point to different RDDs on each iteration.

Transformations

map( f : T ) U) : RDD[T] ) RDD[U]filter( f : T ) Bool) : RDD[T] ) RDD[T]

flatMap( f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)

groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey( f : (V,V)) V) : RDD[(K, V)] ) RDD[(K, V)]

union() : (RDD[T],RDD[T])) RDD[T]join() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (V, W))]

cogroup() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T],RDD[U])) RDD[(T, U)]

mapValues( f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]

partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

Actions

count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]

reduce( f : (T,T)) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)

save(path : String) : Outputs RDD to a storage system, e.g., HDFS

Table 2: Transformations and actions available on RDDs in Spark. Seq[T] denotes a sequence of elements of type T.

that searches for a hyperplane w that best separates twosets of points (e.g., spam and non-spam emails). The al-gorithm uses gradient descent: it starts w at a randomvalue, and on each iteration, it sums a function of w overthe data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()

var w = // random initial vectorfor (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

}.reduce((a,b) => a+b)w -= gradient

}

We start by defining a persistent RDD called pointsas the result of a map transformation on a text file thatparses each line of text into a Point object. We then re-peatedly run map and reduce on points to compute thegradient at each step by summing a function of the cur-rent w. Keeping points in memory across iterations canyield a 20⇥ speedup, as we show in Section 6.1.

3.2.2 PageRankA more complex pattern of data sharing occurs inPageRank [6]. The algorithm iteratively updates a rankfor each document by adding up contributions from doc-uments that link to it. On each iteration, each documentsends a contribution of r

n to its neighbors, where r is itsrank and n is its number of neighbors. It then updatesits rank to a/N + (1 � a)Âci, where the sum is overthe contributions it received and N is the total number ofdocuments. We can write PageRank in Spark as follows:

// Load graph as an RDD of (URL, outlinks) pairs

ranks0 input file map

contribs0

ranks1

contribs1

ranks2

contribs2

links join

reduce + map

. . .

Figure 3: Lineage graph for datasets in PageRank.

val links = spark.textFile(...).map(...).persist()var ranks = // RDD of (URL, rank) pairsfor (i <- 1 to ITERATIONS) {// Build an RDD of (targetURL, float) pairs// with the contributions sent by each pageval contribs = links.join(ranks).flatMap {(url, (links, rank)) =>links.map(dest => (dest, rank/links.size))

}// Sum contributions by URL and get new ranksranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum)}

This program leads to the RDD lineage graph in Fig-ure 3. On each iteration, we create a new ranks datasetbased on the contribs and ranks from the previous iter-ation and the static links dataset.6 One interesting fea-ture of this graph is that it grows longer with the number

6Note that although RDDs are immutable, the variables ranks andcontribs in the program point to different RDDs on each iteration.

Transformations

map( f : T ) U) : RDD[T] ) RDD[U]filter( f : T ) Bool) : RDD[T] ) RDD[T]

flatMap( f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)

groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey( f : (V,V)) V) : RDD[(K, V)] ) RDD[(K, V)]

union() : (RDD[T],RDD[T])) RDD[T]join() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (V, W))]

cogroup() : (RDD[(K, V)],RDD[(K, W)])) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T],RDD[U])) RDD[(T, U)]

mapValues( f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]

partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

Actions

count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]

reduce( f : (T,T)) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)

save(path : String) : Outputs RDD to a storage system, e.g., HDFS

Table 2: Transformations and actions available on RDDs in Spark. Seq[T] denotes a sequence of elements of type T.

that searches for a hyperplane w that best separates twosets of points (e.g., spam and non-spam emails). The al-gorithm uses gradient descent: it starts w at a randomvalue, and on each iteration, it sums a function of w overthe data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()

var w = // random initial vectorfor (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

}.reduce((a,b) => a+b)w -= gradient

}

We start by defining a persistent RDD called pointsas the result of a map transformation on a text file thatparses each line of text into a Point object. We then re-peatedly run map and reduce on points to compute thegradient at each step by summing a function of the cur-rent w. Keeping points in memory across iterations canyield a 20⇥ speedup, as we show in Section 6.1.

3.2.2 PageRankA more complex pattern of data sharing occurs inPageRank [6]. The algorithm iteratively updates a rankfor each document by adding up contributions from doc-uments that link to it. On each iteration, each documentsends a contribution of r

n to its neighbors, where r is itsrank and n is its number of neighbors. It then updatesits rank to a/N + (1 � a)Âci, where the sum is overthe contributions it received and N is the total number ofdocuments. We can write PageRank in Spark as follows:

// Load graph as an RDD of (URL, outlinks) pairs

ranks0 input file map

contribs0

ranks1

contribs1

ranks2

contribs2

links join

reduce + map

. . .

Figure 3: Lineage graph for datasets in PageRank.

val links = spark.textFile(...).map(...).persist()var ranks = // RDD of (URL, rank) pairsfor (i <- 1 to ITERATIONS) {// Build an RDD of (targetURL, float) pairs// with the contributions sent by each pageval contribs = links.join(ranks).flatMap {(url, (links, rank)) =>links.map(dest => (dest, rank/links.size))

}// Sum contributions by URL and get new ranksranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum)}

This program leads to the RDD lineage graph in Fig-ure 3. On each iteration, we create a new ranks datasetbased on the contribs and ranks from the previous iter-ation and the static links dataset.6 One interesting fea-ture of this graph is that it grows longer with the number

6Note that although RDDs are immutable, the variables ranks andcontribs in the program point to different RDDs on each iteration.

Page 21: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

PageRank

15.4!

13.1!

2.9!

8.4!

6.9!

2.9!

0!

5!

10!

15!

20!

In-mem HDFS! In-mem local file! Spark RDD!

Itera

tion

time

(s)! Text Input!

Binary Input!

Figure 9: Iteration times for logistic regression using 256 MBdata on a single machine for different sources of input.

3. Deserialization cost to convert binary records to us-able in-memory Java objects.

We investigated each of these factors in turn. To mea-sure (1), we ran no-op Hadoop jobs, and saw that these atincurred least 25s of overhead to complete the minimalrequirements of job setup, starting tasks, and cleaning up.Regarding (2), we found that HDFS performed multiplememory copies and a checksum to serve each block.

Finally, to measure (3), we ran microbenchmarks ona single machine to run the logistic regression computa-tion on 256 MB inputs in various formats. In particular,we compared the time to process text and binary inputsfrom both HDFS (where overheads in the HDFS stackwill manifest) and an in-memory local file (where thekernel can very efficiently pass data to the program).

We show the results of these tests in Figure 9. The dif-ferences between in-memory HDFS and local file showthat reading through HDFS introduced a 2-second over-head, even when data was in memory on the local ma-chine. The differences between the text and binary in-put indicate the parsing overhead was 7 seconds. Finally,even when reading from an in-memory file, convertingthe pre-parsed binary data into Java objects took 3 sec-onds, which is still almost as expensive as the logistic re-gression itself. By storing RDD elements directly as Javaobjects in memory, Spark avoids all these overheads.

6.2 PageRankWe compared the performance of Spark with Hadoopfor PageRank using a 54 GB Wikipedia dump. We ran10 iterations of the PageRank algorithm to process alink graph of approximately 4 million articles. Figure 10demonstrates that in-memory storage alone providedSpark with a 2.4⇥ speedup over Hadoop on 30 nodes.In addition, controlling the partitioning of the RDDs tomake it consistent across iterations, as discussed in Sec-tion 3.2.2, improved the speedup to 7.4⇥. The resultsalso scaled nearly linearly to 60 nodes.

We also evaluated a version of PageRank written us-ing our implementation of Pregel over Spark, which wedescribe in Section 7.1. The iteration times were similarto the ones in Figure 10, but longer by about 4 secondsbecause Pregel runs an extra operation on each iterationto let the vertices “vote” whether to finish the job.

171!

80!

72!

28!

23!

14!

0!50!

100!150!200!

30! 60!Itera

tion

time

(s)!

Number of machines!

Hadoop!

Basic Spark!

Spark + Controlled Partitioning!

Figure 10: Performance of PageRank on Hadoop and Spark.

11

9!

57!

56!

58!

58!

81!

57!

59!

57!

59!

0!

20!

40!

60!

80!

100!

120!

140!

1! 2! 3! 4! 5! 6! 7! 8! 9! 10!

Itera

trio

n t

ime (

s)!

Iteration!

No Failure!

Failure in the 6th Iteration!

Figure 11: Iteration times for k-means in presence of a failure.One machine was killed at the start of the 6th iteration, resultingin partial reconstruction of an RDD using lineage.

6.3 Fault Recovery

We evaluated the cost of reconstructing RDD partitionsusing lineage after a node failure in the k-means appli-cation. Figure 11 compares the running times for 10 it-erations of k-means on a 75-node cluster in normal op-erating scenario, with one where a node fails at the startof the 6th iteration. Without any failure, each iterationconsisted of 400 tasks working on 100 GB of data.

Until the end of the 5th iteration, the iteration timeswere about 58 seconds. In the 6th iteration, one of themachines was killed, resulting in the loss of the tasksrunning on that machine and the RDD partitions storedthere. Spark re-ran these tasks in parallel on other ma-chines, where they re-read corresponding input data andreconstructed RDDs via lineage, which increased the it-eration time to 80s. Once the lost RDD partitions werereconstructed, the iteration time went back down to 58s.

Note that with a checkpoint-based fault recoverymechanism, recovery would likely require rerunning atleast several iterations, depending on the frequency ofcheckpoints. Furthermore, the system would need toreplicate the application’s 100 GB working set (the textinput data converted into binary) across the network, andwould either consume twice the memory of Spark toreplicate it in RAM, or would have to wait to write 100GB to disk. In contrast, the lineage graphs for the RDDsin our examples were all less than 10 KB in size.

6.4 Behavior with Insufficient Memory

So far, we ensured that every machine in the clusterhad enough memory to store all the RDDs across itera-

Page 22: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Conclusion• MapReduce makes important abstraction step that greatly helps

rapid development of efficient and robust Big Data data flows.

• But:

• we still need some ‘acking’ to ensure good performances

• problems with iterative analyses

• MapReduce programming is not easy

• Spark overcomes these limitations at the cost of more RAM needed, and makes a steps further towards ‘The data center is the computer’ scenario.

Page 23: Spark and Resilient Distributed Datasetscolazzo/BD/Spark.slides.3.pdf · Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala

Merci!