Download - MapReduce

Transcript
Page 1: MapReduce

MapReduce

Costin Raiciu

Advanced Topics in Distributed Systems, 2011

Page 2: MapReduce

Motivating App

Web Search

12PB of Web data

Must be able to search it quickly

How can we do this?

Page 3: MapReduce

Web Search Primer

Each document is a collection of wordsDifferent frequencies, counts, meaning

Users supply a few words – the query

Task: find all the documents which contain a specified word

Page 4: MapReduce

Solution: an inverted web index

For each keyword, store a list of the documents that contain it:Student -> {a,b,c, …}UPB -> {x,y,z,…}…

When a query comes:Lookup all the keywordsIntersect document listsOrder the results according to their importance

Page 5: MapReduce

How do we build an inverted web index? Read 12PB of web pages

For each page, find its keywordsSlowly build index: 2TB of data

We could run it on a single machine100MB/s hard disk read = 1GB read in 10s120.000.000s just to read on a single machine

~ 4 years!

Page 6: MapReduce

We need parallelism!

Want to run this task in 1 dayWe would need 1400 machines at least

What functionality might we need?

Move data aroundRun processingCheck livenessDeal with failures (certainty!)Get results

Page 7: MapReduce

Inspiration:

Functional Programming

Page 8: MapReduce

Functional Programming Review

Functional operations do not modify data structures: They always create new ones

Original data still exists in unmodified form Data flows are implicit in program design Order of operations does not matter

Page 9: MapReduce

Functional Programming Review

fun foo(l: int list) =

sum(l) + mul(l) + length(l)

Order of sum() and mul(), etc does not matter – they do not modify l

Page 10: MapReduce

Map

map f list

Creates a new list by applying f to each element of the input list; returns output in order.

f f f f f f

Page 11: MapReduce

Fold

fold f x0 list

Moves across a list, applying f to each element plus an accumulator. f returns the next accumulator value, which is combined with the next element of the list

f f f f f returned

initial

Page 12: MapReduce

Implicit Parallelism In map

In a purely functional setting, elements of a list being computed by map cannot see the effects of the computations on other elements

If order of application of f to elements in list is commutative, we can reorder or parallelize execution

This is the “secret” that MapReduce exploits

Page 13: MapReduce

MapReduce

Page 14: MapReduce

Main Observation A large fraction of distributed systems code

has to do with: MonitoringFault toleranceMoving data around

ProblemsDifficult to get right even if you know what you are

doingEvery app implements its own mechanisms

Most of this code is app independent!

Page 15: MapReduce

MapReduce

Automatic parallelization & distribution Fault-tolerant Provides status and monitoring tools Clean abstraction for programmers

Page 16: MapReduce

Programming Model

Borrows from functional programming Users implement interface of two

functions:

map (in_key, in_value) ->

(out_key, intermediate_value) list

reduce (out_key, intermediate_value list) ->

out_value list

Page 17: MapReduce

map

Records from the data source (lines out of files, rows of a database, etc) are fed into the map function as key*value pairs: e.g., (filename, line).

map() produces one or more intermediate values along with an output key from the input.

Page 18: MapReduce

reduce

After the map phase is over, all the intermediate values for a given output key are combined together into a list

reduce() combines those intermediate values into one or more final values for that same output key

(in practice, usually only one final value per key)

Page 19: MapReduce

Data store 1 Data store nmap

(key 1, values...)

(key 2, values...)

(key 3, values...)

map

(key 1, values...)

(key 2, values...)

(key 3, values...)

Input key*value pairs

Input key*value pairs

== Barrier == : Aggregates intermediate values by output key

reduce reduce reduce

key 1, intermediate

values

key 2, intermediate

values

key 3, intermediate

values

final key 1 values

final key 2 values

final key 3 values

...

Page 20: MapReduce

Parallelism

map() functions run in parallel, creating different intermediate values from different input data sets

reduce() functions also run in parallel, each working on a different output key

All values are processed independently Bottleneck: reduce phase can’t start until

map phase is completely finished.

Page 21: MapReduce

How do we place computation?

Master assigns map and reduce jobs to workersDoes this mapping matter?

Page 22: MapReduce

Data Center Network Architecture

…Racks of servers

Top of Rack Switches

Aggregation Switches

Core Switch

1Gbps

10Gbps

10Gbps

Page 23: MapReduce

Locality

Master program divides up tasks based on location of data: tries to have map() tasks on same machine as physical file data, or at least same rack

map() task inputs are divided into 64 MB blocks: same size as Google File System chunks

Page 24: MapReduce

Communication

Map output stored to local disk

Shuffle phase:Reducers need to read data from all mappersTypically cross-rack and expensiveNeed full bisection bandwidth in theory

More about good topologies next time!

Page 25: MapReduce

Fault Tolerance

Master detects worker failuresRe-executes completed & in-progress map()

tasks Why completed also?

Re-executes in-progress reduce() tasks

Page 26: MapReduce

Fault Tolerance (2)

Master notices particular input key/values cause crashes in map(), and skips those values on re-execution.Effect: Can work around bugs in third-party

libraries!

Page 27: MapReduce

Optimizations

No reduce can start until map is complete:A single slow disk controller can rate-limit the

whole process Master redundantly executes stragglers:

“slow-moving” map tasksUses results of first copy to finish

Why is it safe to redundantly execute map tasks? Wouldn’t this mess up the total computation?

Page 28: MapReduce

Optimizations

“Combiner” functions can run on same machine as a mapper

Causes a mini-reduce phase to occur before the real reduce phase, to save bandwidth

Under what conditions is it sound to use a combiner?

Page 29: MapReduce

More and more mapreduce

Page 30: MapReduce

Apache

An Implementation of MapReduce

Page 31: MapReduce

http://hadoop.apache.org/ Open source Java Scale

Thousands of nodes and petabytes of data

Still pre-1.0 release22 04, 2009: release 0.20.017 09, 2008: release 0.18.1but already used by many

Page 32: MapReduce

Hadoop

MapReduce and Distributed File System framework for large commodity clusters

Master/Slave relationshipJobTracker handles all scheduling & data flow

between TaskTrackersTaskTracker handles all worker tasks on a nodeIndividual worker task runs map or reduce

operation Integrates with HDFS for data locality

Page 33: MapReduce

Hadoop Supported File Systems

HDFS: Hadoop's own file system. Amazon S3 file system.

Targeted at clusters hosted on the Amazon Elastic Compute Cloud server-on-demand infrastructure

Not rack-aware CloudStore

previously Kosmos Distributed File System like HDFS, this is rack-aware.

FTP Filesystem stored on remote FTP servers.

Read-only HTTP and HTTPS file systems.

Page 34: MapReduce

"Rack awareness"

optimization which takes into account the geographic clustering of servers

network traffic between servers in different geographic clusters is minimized.

Page 35: MapReduce

Hadoop scheduler

Runs a few map and reduce tasks in parallel on the same machineTo overlap IO and computation

Whenever there is an empty slot the scheduler chooses:A failed task, if it existsAn unassigned task, if it existsA speculative task (also running on another node)

Page 36: MapReduce

wordCount

A Simple Hadoop Examplehttp://wiki.apache.org/hadoop/WordCount

Page 37: MapReduce

Word Count Example

Read text files and count how often words occur. The input is text filesThe output is a text file

each line: word, tab, count Map: Produce pairs of (word, count) Reduce: For each word, sum up the

counts.

Page 38: MapReduce

WordCount Overview 3 import ...

12 public class WordCount {

13

14 public static class Map extends MapReduceBase implements Mapper ... {

17

18 public void map ...

26 }

27

28 public static class Reduce extends MapReduceBase implements Reducer ... {

29

30 public void reduce ...

37 }

38

39 public static void main(String[] args) throws Exception {

40 JobConf conf = new JobConf(WordCount.class);

41 ...

53 FileInputFormat.setInputPaths(conf, new Path(args[0]));

54 FileOutputFormat.setOutputPath(conf, new Path(args[1]));

55

56 JobClient.runJob(conf);

57 }

58

59 }

Page 39: MapReduce

wordCount Mapper 14 public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text,

IntWritable> {

15 private final static IntWritable one = new IntWritable(1);

16 private Text word = new Text();

17

18 public void map(

LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

19 String line = value.toString();

20 StringTokenizer tokenizer = new StringTokenizer(line);

21 while (tokenizer.hasMoreTokens()) {

22 word.set(tokenizer.nextToken());

23 output.collect(word, one);

24 }

25 }

26 }

Page 40: MapReduce

wordCount Reducer

28 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

29 30 public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {

31 int sum = 0; 32 while (values.hasNext()) { 33 sum += values.next().get(); 34 } 35 output.collect(key, new IntWritable(sum)); 36 } 37 }

Page 41: MapReduce

wordCount JobConf

40 JobConf conf = new JobConf(WordCount.class); 41 conf.setJobName("wordcount"); 42 43 conf.setOutputKeyClass(Text.class); 44 conf.setOutputValueClass(IntWritable.class); 45 46 conf.setMapperClass(Map.class); 47 conf.setCombinerClass(Reduce.class); 48 conf.setReducerClass(Reduce.class); 49 50 conf.setInputFormat(TextInputFormat.class); 51 conf.setOutputFormat(TextOutputFormat.class);

Page 42: MapReduce

WordCount main

39 public static void main(String[] args) throws Exception {

40 JobConf conf = new JobConf(WordCount.class);

41 conf.setJobName("wordcount");

42

43 conf.setOutputKeyClass(Text.class);

44 conf.setOutputValueClass(IntWritable.class);

45

46 conf.setMapperClass(Map.class);

47 conf.setCombinerClass(Reduce.class);

48 conf.setReducerClass(Reduce.class);

49

50 conf.setInputFormat(TextInputFormat.class);

51 conf.setOutputFormat(TextOutputFormat.class);

52

53 FileInputFormat.setInputPaths(conf, new Path(args[0]));

54 FileOutputFormat.setOutputPath(conf, new Path(args[1]));

55

56 JobClient.runJob(conf);

57 }

Page 43: MapReduce

Invocation of wordcount

1. /usr/local/bin/hadoop dfs -mkdir <hdfs-dir>2. /usr/local/bin/hadoop dfs -copyFromLocal

<local-dir> <hdfs-dir> 3. /usr/local/bin/hadoop

jar hadoop-*-examples.jar wordcount <in-dir> <out-dir>

Page 44: MapReduce

Hadoop At Work

Page 45: MapReduce

Experimental setup 12 servers connected to a gigabit switch

Same hardwareSingle hard disk per server

Filesystem: HDFS with replication 2128MB block size

3 Map and 2 Reduce tasks per machine

DataCrawl of the .uk domain (2009)50GB (unreplicated)

Page 46: MapReduce

Monitoring Task Progress

Hadoop estimates task statusMap: % of input data read from HDFSReduce

33% - progress in copy (shuffle) phase 33% - sorting keys 33% - writing output in HDFS

Hadoop computes average progress score for each category

Page 47: MapReduce

5 Sep, 2011: release 0.20.204.0 available

Page 48: MapReduce
Page 49: MapReduce
Page 50: MapReduce

Back to Hadoop Overview

Page 51: MapReduce

Speculative Execution

Rerun a task if it is slow:Threshold for speculative execution: 20% less

than its category’s average

AssumptionsAll machines are homogeneousTask progress at constant rateThere is no cost in launching speculative

tasks

Page 52: MapReduce

Thought Experiment

What happens if one machine is slow?

What happens if there is network congestion on one link in the reduce phase?

Page 53: MapReduce

LATE scheduler[Zaharia, OSDI 2008] Calculate progress rate:

ProgressRate = ProgressScore / TTime to finish:

(1-ProgressScore)/ProgressRate Only rerun on fast nodes

Put a cap on the number of speculative tasks

Page 54: MapReduce

Some results on EC2

Page 55: MapReduce

Some more results…

Page 56: MapReduce

MapReduce Related Work Shared memory architectures: do not scale up MPI: general purpose, difficult to scale up to

more than a few tens of hosts Driad/DriadLINQ: computation is Directed

Acyclic GraphMore general computation modelStill not Turing Complete

Active area of researchHotCloud 2010, NSDI 2011: Spark / Ciel

Page 57: MapReduce

Conclusions

MapReduce is a very useful abstraction: it greatly simplifies large-scale computations Does it replace traditional databases? What is missing?

Fun to use: focus on problem, let library deal w/ messy details

Page 58: MapReduce

Project reminder Check the EC2 public data repository web

page http://aws.amazon.com/datasets/

Start browsing the datasets Many are externally available If not, email me and I’ll mount the dataset for you

Select your group Start playing with Hadoop


Top Related