mapreduce - seoul national university

Post on 19-Nov-2021






Click to see full reader


MapReduce:Simplified Data Processing on Large Clusters

Jeffrey Dean and Sanjay Ghemawat (OSDI `04)

Seong Hoon Seo, Hyunji ChoiDecember 1st, 2020

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall

Introduction and Motivation


● Computation in Google: Derived Data = F(large raw data)○ Input: crawled documents, web request logs

○ Output: inverted indices, set of most frequent queries

● Example: Inverted Index

Source: Lucidworks

Distributed Systems, 2020 Fall

Introduction and Motivation


● Characteristics of Computation○ Conceptually straightforward

○ Distributed computation is necessary

○ Complex Implementation in distributed environment

● Challenges of Distributed Computation○ Parallelization

○ Fault-tolerance

○ Data distribution

○ Load balancing

Distributed Systems, 2020 Fall

Introduction and Motivation


Solution: MapReduce Programming Model

● Interface

○ Enables automatic parallelization and distribution

● Implementation

○ Resolves the challenges of distributed computation and achieves performance

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall

Programming Model


● Input and Output: Set of key/value pairs (i.e., (k, v))

● Map: (k1, v1) → list of (k2, v2)

● Reduce: (k2, list of (v2)) → (k2, list of (v2))

*k1, k2, v1, v2 are types (e.g., Int, String)

● Implementation Details

○ How intermediate values associated with a given key is grouped

Distributed Systems, 2020 Fall

Programming Model


Map: (k1, v1) → list of (k2, v2)

Reduce: (k2, list of (v2)) → (k2, list of (v2))

● Example 1: Word Count

○ Map: (document name, contents) → list of (word, 1)

○ Reduce: (word, list of (“1”)) → (word, Count)

Distributed Systems, 2020 Fall

Programming Model


Map: (k1, v1) → list of (k2, v2)

Reduce: (k2, list of (v2)) → (k2, list of (v2))

● Example 2: Inverted Index

○ Map: (document, words) → list of (word, document ID)

○ Reduce: (word, list of (document IDs)) → (word, sorted list of (document IDs))

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall

Execution Flow


# of Map tasks: M = 5

# of Reduce tasks: R = 2

Distributed Systems, 2020 Fall

Execution Flow


● Step 1: Input Split

○ M pieces, usually 16 ~ 64 MB per piece (configurable)

○ Each piece corresponds to a “map task”

Distributed Systems, 2020 Fall

Execution Flow


● Step 2: Master and Worker Generation

○ Single master node

Distributed Systems, 2020 Fall

Execution Flow


● Two types of tasks

○ M pieces → M map tasks

○ R partitioned intermediate key space → R reduce tasks

■ e.g., hash(key) mod R

Distributed Systems, 2020 Fall

Execution Flow


● Step 3: Map Phase

○ parse input key/value pairs

○ Intermediate key/value pairs buffered in memory

Distributed Systems, 2020 Fall

Execution Flow


● Step 4: Periodic Store

○ Buffered pairs written to local disk

○ Each local disk is partitioned into R regions

○ Location of buffered pairs on local disk are passed back to the master

Distributed Systems, 2020 Fall

Execution Flow


● Step 5: Reduce Phase - Read

○ Master notifies locations of buffered pairs to reduce workers

○ Use Remote Procedure Calls (RPC) to read data from disks in map worker

Distributed Systems, 2020 Fall

Execution Flow


● Step 6: Reduce Phase - Process

○ Sorts and Groups by intermediate keys

○ Perform Reduce function for each unique key

○ Append result to output file

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall



A. Master Data Structures

● State of each map and reduce task (idle / in-progress /completed)

○ Assigned worker node identity (for non-idle tasks)

● Location and size of intermediate file regions for each map task

B. Task Granularity

● Factors to Consider

○ Scheduling decision: O(M + R)

○ Master state capacity: O(M * R)

○ User preference on number of output files

Distributed Systems, 2020 Fall



C. Fault tolerance

1. Worker Failure

● Detection: periodic ping

● Recovery: Reset task to idle and reassign

2. Master Failure

● Retry the entire MapReduce operation

● Make master write periodic checkpoints of master data structure

Reset Required? Map Task Reduce Task

In-Progress O O

Completed O X

(O) intermediate pairs stored on local disk of failed machine is no longer accessible

(X) output of Reduce is stored in a global file system

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall

Implementation Details - Locality


● Locality Optimization

○ GFS divides each file into 64MB blocks and stores several copies.

○ Master attempts to schedule a map task on a machine that contains a replica of

the corresponding input data or near it (same network switch).

○ Conserve network bandwidth.

Distributed Systems, 2020 Fall

Implementation Details - Backup Tasks


● Problem: “Straggler” workers

○ Workers that take unusually long time to complete a task

● Solution: schedule “backup” executions of the remaining tasks

○ When a MapReduce operation is close to completion

● Gain: significant on large operations

○ 44% slower without backup tasks for Sort

Distributed Systems, 2020 Fall

Refinements - M splits to R outputs


R = 2M = 5

Distributed Systems, 2020 Fall

Refinements - M splits to R outputs


Input Split Map Reduce Output

A red dog and a blue cat and a blue dog and a red cat

A red dog and a

Blue cat and a blue

Dog and a red cat

A, 4And, 3Blue, 2

Cat, 2Dog, 2Red, 2

M = 3 R = 2

A, 1Red, 1Dog, 1And, 1A, 1

Blue, 1Cat, 1And, 1A, 1Blue, 1

Dog, 1And, 1A, 1Red, 1Cat, 1

Distributed Systems, 2020 Fall

Refinements - M splits to R outputs


Map Partition Combiner Shuffle Sort

A, 4And, 3Blue, 2

Cat, 2Dog, 2Red, 2

A, 1Red, 1Dog, 1And, 1A, 1

Blue, 1Cat, 1And, 1A, 1Blue, 1

Dog, 1And, 1A, 1Red, 1Cat, 1


A, 1And, 1A, 1

Blue, 1And, 1A, 1Blue, 1

And, 1A, 1

Red, 1Dog, 1

Cat, 1

Dog, 1Red, 1Cat, 1

A, 2And, 1Blue, 2And, 1A, 1And, 1A, 1

Red, 1Dog, 1Cat, 1Dog, 1Red, 1Cat, 1

A, 2And, 1

Blue, 2And, 1A, 1

And, 1A, 1

Red, 1Dog, 1

Cat, 1

Dog, 1Red, 1Cat, 1

A, 2A, 1A, 1And, 1And, 1And, 1Blue, 2

Cat, 1Cat, 1Dog, 1Dog, 1Red, 1Red, 1

15 k-v pairs 13 k-v pairs

Distributed Systems, 2020 Fall

Refinements - M splits to R outputs


● Partitioning Function

○ Default: hash(key) mod R

○ Custom: ex) hash(Hostname(urlkey)) mod R

● Ordering Guarantees

○ Processed in increasing key order.

● Combiner Function

○ Reducer applied in map task workers.

○ Reduce network overhead.

Distributed Systems, 2020 Fall

Refinements - Interaction with Master


Distributed Systems, 2020 Fall

Skipping Bad Records


Record 34Record 35Record 36



Signal Handler

Record 34Record 35Record 36



Signal Handler

Record 34 0Record 35 2Record 36 0



Send skip signal

Distributed Systems, 2020 Fall

Status Information


Distributed Systems, 2020 Fall



Distributed Systems, 2020 Fall



● Input/Output Types

○ ex) “text” mode input: <offset, contents of line>

○ User can define custom reader/writer interface.

● Side-effects

○ Produce auxiliary files as additional outputs.

● Local Execution

○ Sequentially executes all of the work on the local machine.

○ Easily use any debugging or testing tools.

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall



● Cluster Configuration

○ 1800 nodes.

○ Two 2GHz Intel Xeon with HyperThreading, 2.5-3GB memory.

○ 100-200 Gbps of aggregate bandwidth.

● Benchmarks

○ Grep: search a rare pattern (92K matching records) out of 1010 100-byte records.

○ Sort: sort 1010 100-byte records (modeled after TeraSort benchmark).

Distributed Systems, 2020 Fall

Performance - Grep


1764 workers assigned

Read done

Startup overhead *

* Copy program to all workers & Locality optimization.

Distributed Systems, 2020 Fall

Performance - Sort


Text line Key, Text line (Sorted) Text line

● Input rate less than Grep

○ Intermediate files are larger (matching pattern vs total text line).

● Input > Shuffle > Output rate

○ Input rate benefits from locality optimization.

○ Output rate is low due to reliability policy of GFS - keep 2 copies.

Distributed Systems, 2020 Fall

Performance - Effect of Backup Tasks


5 stragglers44% increase in time

Distributed Systems, 2020 Fall

Performance - Machine failures


Re-read completed Map files

Only 5% increase

Distributed Systems, 2020 Fall



● Introduction and Motivation

● Programming Model

● Execution Flow

● Implementation

● Details and Refinements

● Performance

● Experience

● Conclusion

Distributed Systems, 2020 Fall

Experience in Google


● Broadly applicable including

○ Large-scale machine learning problems.

○ Extraction of properties of web pages.

● Renewed production indexing system

○ Code is simpler, hiding details regarding fault tolerance and parallelization.

○ Keep conceptually unrelated computations separate.

○ Easy to operate and scale.

Distributed Systems, 2020 Fall



● MapReduce programming model

○ Is easy to use.

○ A large variety of problems are easily expressible as MapReduce


○ Scales to large clusters of machines.

top related