report hadoop map reduce

31
MapReduce Page 1 INDEX A. Abstract 2 B. History 3 1. What is Apache Hadoop 4 2. Hadoop MapReduce 4 3. Hadoop File Distribution System 4 4. Logical View of MapReduce 10 5. Inputs & Outputs 11 6. Example:- WordCount v1.0 12 6.1 Walk-through 15 7. Some more examples 17 8. MapReduce User Interfaces 17 8.1 Payload 17 8.1.1 Mapper 18 8.1.1.1 How many Maps? 19 8.1.2 Reducer 19 8.1.2.1 Shuffle 20 8.1.2.2 Sort 20 8.1.2.3 Reduce 20 8.1.2.4 How Many Reducers? 20 8.1.2.5 Reducer NONE 21 8.1.3 Partitioner 21 8.1.4 Reporter 21 8.1.5 Output Collector 22 9. Implementation 22 9.1 Execution Overview 22 9.2 Master Data Structure 25 9.3 Fault Tolerance 26 9.4 Locality 28 9.5 Task Granularity 28 9.6 Backup Tasks 28 10. Disadvantage/Reasons for introducing new technologies 29 Conclusion 30 Reference 31

Upload: urvashi-kataria

Post on 21-Jan-2018

326 views

Category:

Data & Analytics


0 download

TRANSCRIPT

Page 1: Report Hadoop Map Reduce

MapReduce Page 1

INDEX

A. Abstract 2

B. History 3

1. What is Apache Hadoop 4

2. Hadoop MapReduce 4

3. Hadoop File Distribution System 4

4. Logical View of MapReduce 10

5. Inputs & Outputs 11

6. Example:- WordCount v1.0 12

6.1 Walk-through 15

7. Some more examples 17

8. MapReduce – User Interfaces 17

8.1 Payload 17

8.1.1 Mapper 18

8.1.1.1 How many Maps? 19

8.1.2 Reducer 19

8.1.2.1 Shuffle 20

8.1.2.2 Sort 20

8.1.2.3 Reduce 20

8.1.2.4 How Many Reducers? 20

8.1.2.5 Reducer NONE 21

8.1.3 Partitioner 21

8.1.4 Reporter 21

8.1.5 Output Collector 22

9. Implementation 22

9.1 Execution Overview 22

9.2 Master Data Structure 25

9.3 Fault Tolerance 26

9.4 Locality 28

9.5 Task Granularity 28

9.6 Backup Tasks 28

10. Disadvantage/Reasons for introducing new technologies 29

Conclusion 30

Reference 31

Page 2: Report Hadoop Map Reduce

MapReduce Page 2

ABSTRACT

Hadoop MapReduce is a software framework for easily writing applications which process vast

amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of

commodity hardware in a reliable, fault-tolerant manner. A MapReduce job usually splits the

input data-set into independent chunks which are processed by the map tasks in a completely

parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce

tasks. Typically both the input and the output of the job are stored in a file-system. The

framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce

framework and the Hadoop Distributed File System (HDFS) are running on the same set of

nodes. This configuration allows the framework to effectively schedule tasks on the nodes where

data is already present, resulting in very high aggregate bandwidth across the cluster. The

MapReduce framework consists of a single master Job Tracker and one slave Task Tracker per

cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves,

monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the

master.

Page 3: Report Hadoop Map Reduce

MapReduce Page 3

INTRODUCTION

HISTORY:-

History of MapReduce belongs to GFS, which was implemented primarily to handle Google's

rapidly growing requirements of their data processing needs.

HDFS on the other hand was implemented with the intention of running Hadoop MapReduce

Applications. These were created for different clients with different needs.

File Structure:

GFS is divided into 64MB chunks. Chunks identified by a 64 bit handle, and are replicated

(default thrice). They are further divided into 64 KB blocks and each block has a checksum. In

HDFS we have 128 MB blocks. The NameNode holds the replica as two files: one for data and

one for checksum and generation stamp.

Page 4: Report Hadoop Map Reduce

MapReduce Page 4

I/O:

Key difference here is that GFS has the concept of leases, and HDFS does not, the client decides

where to write. Since the file's block location is exposed, this allows for applications

like MapReduce to schedule tasks.

Data Balancing:

GFS places new replicas on the chunk servers with Minimum disk space utilization, and the

Master rebalances replicas periodically. In HDFS this is handled by an application called the

Balancer. HDFS avoids disk space utilization on writes, thereby avoiding some DataNodes to

choke up.

Hadoop is definitely inspired by Google MapReduce/GFS and aims to provide those capabilities

as an open-source project.

Page 5: Report Hadoop Map Reduce

MapReduce Page 5

1. What is Apache Hadoop?

Large scale, open source software framework

Yahoo! has been the largest contributor to date

Dedicated to scalable, distributed, data-intensive computing

Handles thousands of nodes and petabytes of data

Supports applications under a free license

2. Hadoop MapReduce

MapReduce is a programming model and software framework first developed by Google

(Google’s MapReduce paper submitted in 2004)

Intended to facilitate and simplify the processing of vast amounts of data in parallel on

large clusters of commodity hardware in a reliable, fault-tolerant manner

o Petabytes of data

o Thousands of nodes

Computational processing occurs on both:

o Unstructured data : file system

o Structured data : database

3. Hadoop Distributed File System (HFDS)

Inspired by Google File System

Scalable, distributed, portable file system written in Java for Hadoop framework

o Primary distributed storage used by Hadoop applications

HFDS can be part of a Hadoop cluster or can be a stand-alone general purpose distributed

file system

An HFDS cluster primarily consists of :-

o NameNode that manages file system metadata

o DataNode that stores actual data

Stores very large files in blocks across machines in a large cluster

o Reliability and fault tolerance ensured by replicating data across multiple hosts

Has data awareness between nodes

Designed to be deployed on low-cost hardware

Page 6: Report Hadoop Map Reduce

MapReduce Page 6

The applications running on Hadoop clusters are increasing day by day. This is due to the fact

that organizations have found a simple and efficient model that works well in distributed

environment. Google’s MapReduce programming model serves for processing large data sets in

a massively parallel manner. The model is built to work efficiently on thousands of machines and

massive data sets using commodity hardware. HDFS and MapReduce is a scalable and fault-

tolerant model that hides all the complexities for Big Data analytics.

Since Hadoop is becoming increasingly popular, understanding technical details becomes

essential. Minimally, applications specify the input/output locations and supply map and reduce

functions via implementations of appropriate interfaces and/or abstract-classes. These, and other

job parameters, comprise the job configuration. The Hadoop job client then submits the job

(jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility

of distributing the software/configuration to the slaves, scheduling tasks and monitoring them,

providing status and diagnostic information to the job-client. Although the Hadoop framework is

implemented in Java, MapReduce applications need not be written in Java.

Page 7: Report Hadoop Map Reduce

MapReduce Page 7

HadoopStreaming is a utility which allows users to create and run jobs with any executable

(e.g. shell utilities) as the mapper and/or the reducer.

• HadoopPipes is a SWIG- compatible C++ API to implement MapReduce applications (non

JNI based).

MapReduce is a framework for processing parallelizable problems across huge datasets using a

large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the

same local network and use similar hardware) or a grid (if the nodes are shared across

geographically and administratively distributed systems, and use more heterogeneous hardware).

Processing can occur on data stored either in a file system (unstructured) or in a database

Page 8: Report Hadoop Map Reduce

MapReduce Page 8

(structured). MapReduce can take advantage of locality of data, processing it on or near the

storage assets in order to reduce the distance over which it must be transmitted.

"Map" step: Each worker node applies the "map()" function to the local data, and writes

the output to a temporary storage. A master node orchestrates that for redundant copies of

input data, only one is processed.

"Shuffle" step: Worker nodes redistribute data based on the output keys (produced by

the "map()" function), such that all data belonging to one key is located on the same

worker node.

"Reduce" step: Worker nodes now process each group of output data, per key, in

parallel.

MapReduce allows for distributed processing of the map and reduction operations. Provided that

each mapping operation is independent of the others, all maps can be performed in parallel –

though in practice this is limited by the number of independent data sources and/or the number of

CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided

that all outputs of the map operation that share the same key are presented to the same reducer at

the same time, or that the reduction function is associative. While this process can often appear

inefficient compared to algorithms that are more sequential, MapReduce can be applied to

significantly larger datasets than "commodity" servers can handle – a large server farm can use

MapReduce to sort a petabyte of data in only a few hours. The parallelism also offers some

possibility of recovering from partial failure of servers or storage during the operation: if one

mapper or reducer fails, the work can be rescheduled – assuming the input data is still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

1. Prepare the Map() input – The "MapReduce system" designates Map processors,

assigns the input key value K1 that each processor would work on, and provides that

processor with all the input data associated with that key value.

2. Run the user-provided Map() code – Map() is run exactly once for each K1 key value,

generating output organized by key values K2.

3. "Shuffle" the Map output to the Reduce processors – the MapReduce system

designates Reduce processors, assigns the K2 key value each processor should work on,

and provides that processor with all the Map-generated data associated with that key

value.

4. Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key

value produced by the Map step.

5. Produce the final output – The MapReduce system collects all the Reduce output, and

sorts it by K2 to produce the final outcome.

These five steps can be logically thought of as running in sequence – each step starts only after

the previous step is completed – although in practice they can be interleaved as long as the final

result is not affected.

Page 9: Report Hadoop Map Reduce

MapReduce Page 9

In many situations, the input data might already be distributed ("shared") among many different

servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers

that would process the locally present input data. Similarly, step 3 could sometimes be sped up

by assigning Reduce processors that are as close as possible to the Map-generated data they need

to process.

Page 10: Report Hadoop Map Reduce

MapReduce Page 10

4. Logical view

The Map and Reduce functions of MapReduce are both defined with respect to data structured in

(key, value) pairs. Map takes one pair of data with a type in one data domain, and returns a list of

pairs in a different domain:

Map(k1,v1) → list(k2,v2)

The Map function is applied in parallel to every pair in the input dataset. This produces a list of

pairs for each call. After that, the MapReduce framework collects all pairs with the same key

from all lists and groups them together, creating one group for each key.

The Reduce function is then applied in parallel to each group, which in turn produces a collection

of values in the same domain:

Reduce(k2, list (v2)) → list(v3)

Each Reduce call typically produces either one value v3 or an empty return, though one call is

allowed to return more than one value. The returns of all calls are collected as the desired result

list.

Thus the MapReduce framework transforms a list of (key, value) pairs into a list of values. This

behavior is different from the typical functional programming map and reduce combination,

which accepts a list of arbitrary values and returns one single value that combines all the values

returned by map.

Page 11: Report Hadoop Map Reduce

MapReduce Page 11

5. Inputs and Outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework

views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs

as the output of the job, conceivably of different types. The key and value classes have to be

serializable by the framework and hence need to implement the Writable interface. Additionally,

the key classes have to implement the WritableComparable interface to facilitate sorting by the

framework.

Input and Output types of a MapReduce job:

(input) <k1, v1>-> map-> <k2, v2>-> combine-> <k2, v2>-> reduce-> <k3,v3>(output)

Page 12: Report Hadoop Map Reduce

MapReduce Page 12

6. Example: WordCount v1.0

Let’s walk through an example MapReduce application to get a flavor for how they work.

WordCount is a simple application that counts the number of occurrences of each word in a

given input set. This works with a local-standalone, pseudo-distributed or fully-distributed

Hadoop installation (Single Node Setup).

The prototypical MapReduce example counts the appearance of each word in a set of documents:

function map(String name, String document):

// name: document name

// document: document contents

for each word w in document:

emit (w, 1)

function reduce(String word, Iterator partialCounts):

// word: a word

// partialCounts: a list of aggregated partial counts

sum = 0

for each pc in partialCounts:

sum += ParseInt(pc)

emit (word, sum)

Here, each document is split into words, and each word is counted by the map function, using the

word as the result key. The framework puts together all the pairs with the same key and feeds

them to the same call to reduce. Thus, this function just needs to sum all of its input values to

find the total appearances of that word.

Source Code - WordCount.java

1. package org.myorg;

2.

3. import java.io.IOException;

4. import java.util.*;

5.

6. import org.apache.hadoop.fs.Path;

7. import org.apache.hadoop.conf.*;

Page 13: Report Hadoop Map Reduce

MapReduce Page 13

8. import org.apache.hadoop.io.*;

9. import org.apache.hadoop.mapred.*;

10. import org.apache.hadoop.util.*;

11.

12. public class WordCount {

13.

14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,

Text, IntWritable> {

15. private final static IntWritable one = new IntWritable(1);

16. private Text word = new Text();

17.

18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException {

19. String line = value.toString();

20. StringTokenizer tokenizer = new StringTokenizer(line);

21. while (tokenizer.hasMoreTokens()) {

22. word.set(tokenizer.nextToken());

23. output.collect(word, one);

24. }

25. }

26. }

27.

28. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,

Text, IntWritable> {

29. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,

IntWritable> output, Reporter reporter) throws IOException {

30. int sum = 0;

Page 14: Report Hadoop Map Reduce

MapReduce Page 14

31. while (values.hasNext()) {

32. sum += values.next().get();

33. }

34. output.collect(key, new IntWritable(sum));

35. }

36. }

37.

38. public static void main(String[] args) throws Exception {

39. JobConf conf = new JobConf(WordCount.class);

40. conf.setJobName("wordcount");

41.

42. conf.setOutputKeyClass(Text.class);

43. conf.setOutputValueClass(IntWritable.class);

44.

45. conf.setMapperClass(Map.class);

46. conf.setCombinerClass(Reduce.class);

47. conf.setReducerClass(Reduce.class);

48.

49. conf.setInputFormat(TextInputFormat.class);

50. conf.setOutputFormat(TextOutputFormat.class);

51.

52. FileInputFormat.setInputPaths(conf, new Path(args[0]));

53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));

54.

55. JobClient.runJob(conf);

Page 15: Report Hadoop Map Reduce

MapReduce Page 15

57. }

58. }

59.

6.1 Walk-through

The WordCount application is quite straight-forward. The Mapper implementation (lines 14-26),

via the map method (lines 18-25), processes one line at a time, as provided by the specified

TextInputFormat(line 49). It then splits the line into tokens separated by whitespaces, via the

StringTokenizer, and emits a key value pair of < <word>, 1>

For the given sample input the first map emits:

< Hello, 1>

< World, 1>

< Bye, 1>

< World, 1>

Page 16: Report Hadoop Map Reduce

MapReduce Page 16

The second map emits:

< Hello, 1>

< Hadoop, 1>

< Goodbye, 1>

< Hadoop, 1>

We'll learn more about the number of maps spawned for a given job, and how to control them in

a fine-grained manner, a bit later in the tutorial. WordCount also specifies a combiner(line 46).

Hence, the output of each map is passed through the local combiner (which is same as the

Reducer as per the job configuration) for local aggregation, after being sorted on the keys.

The output of the first map:

< Bye, 1>

< Hello, 1>

< World, 2>

The output of the second map:

< Goodbye, 1>

< Hadoop, 2>

< Hello, 1>

The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums up the

values, which are the occurrence counts for each key (i.e. words in this example).

Thus the output of the job is:

< Bye, 1>

< Goodbye, 1>

< Hadoop, 2>

< Hello, 2>

< World, 2>

Page 17: Report Hadoop Map Reduce

MapReduce Page 17

The run method specifies various facets of the job, such as the input/output paths (passed via the

command line), key/value types, input/output formats etc., in the JobConf. It then calls the

JobClient.runJob (line 55) to submit the and monitor its progress.

7. More Examples

Here are a few simple examples of interesting programs that can be easily expressed as

MapReduce computations.

Distributed Grep: The map function emits a line if it matches a supplied pattern. The reduce

function is an identity function that just copies the supplied intermediate data to the output.

Count of URL Access Frequency: The map function processes logs of web page requests and

outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a

<URL, total count> pair.

Reverse Web-Link Graph: The map function outputs <target, source> pairs for each link to a

target URL found in a page named source. The reduce function concatenates the list of all source

URLs associated with a given target URL and emits the pair: <target, list(source)>

8. MapReduce - User Interfaces

This section provides a reasonable amount of detail on every user-facing aspect of the

MapReduce framework. This should help users implement, configure and tune their jobs in a

fine-grained manner. However, please note that the javadoc for each class/interface remains the

most comprehensive documentation available; this is only meant to be a tutorial. Let us first take

the Mapper and Reducer interfaces. Applications typically implement them to provide the map

and reduce methods.

8.1 Payload

Applications typically implement the Mapper and Reducer interfaces to provide the map and

reduce methods. These form the core of the job.

Page 18: Report Hadoop Map Reduce

MapReduce Page 18

8.1.1 Mapper

Mapper maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks that transform input records into intermediate records. The

transformed intermediate records do not need to be of the same type as the input records. A given

input pair may map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the

InputFormat for the job.

Overall, Mapper implementations are passed the JobConf for the job via the

JobConfigurable.configure(JobConf) method and override it to initialize themselves. The

framework then calls map(WritableComparable, Writable, OutputCollector, Reporter) for each

Page 19: Report Hadoop Map Reduce

MapReduce Page 19

key/value pair in the InputSplit for that task. Applications can then override the

Closeable.close()method to perform any required cleanup.

Output pairs do not need to be of the same types as input pairs. A given input pair may map to

zero or many output pairs. Output pairs are collected with calls to

OutputCollector.collect(WritableComparable,Writable).

Applications can use the Reporter to report progress, set application-level status messages and

update Counters, or just indicate that they are alive.

All intermediate values associated with a given output key are subsequently grouped by the

framework, and passed to the Reducer(s) to determine the final output. Users can control the

grouping by specifying a Comparator via JobConf.setOutputKeyComparatorClass(Class).

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions

is the same as the number of reduce tasks for the job. Users can control which keys (and hence

records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via JobConf.setCombinerClass(Class), to perform local

aggregation of the intermediate outputs, which helps to cut down the amount of data transferred

from the Mapper to the Reducer.

The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value)

format. Applications can control if, and how, the intermediate outputs are to be compressed and

the CompressionCodec to be used via the JobConf.

8.1.1.1 How Many Maps?

The number of maps is usually driven by the total size of the inputs, that is, the total

number of blocks of the input files.

The right level of parallelism for maps seems to be around 10-100 maps per-node,

although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so

it is best if the maps take at least a minute to execute.

Thus, if you expect 10TB of Input data and have a block size of 128MB, you'll end up

with 82,000 maps, unless setNumMapTasks(int) (which only provides a hint to the framework)

is used to set it even higher.

8.1.2 Reducer

Reducer reduces a set of intermediate values which share a key to a smaller set of values.

The number of reduces for the job is set by the user via JobConf.setNumReduceTasks(int).

Page 20: Report Hadoop Map Reduce

MapReduce Page 20

Overall, Reducer implementations are passed the JobConf for the job via the

JobConfigurable.configure(JobConf)method and can override it to initialize themselves. The

framework then calls reduce(WritableComparable, Iterator, OutputCollector, Reporter) method

for each <key, (list of values)>pair in the grouped inputs. Applications can then override the

Closeable.close()method to perform any required cleanup

Reducer has 3 primary phases: shuffle, sort and reduce.

8.1.2.1 Shuffle

Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the

relevant partition of the output of all the mappers, via HTTP.

8.1.2.2 Sort

The framework groups Reducer inputs by keys (since different mappers may have output the

same key) in this stage.

The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are

merged.

Secondary Sort

If equivalence rules for grouping the intermediate keys are required to be different from those for

grouping keys before reduction, then one may specify a Comparator via

JobConf.setOutputValueGroupingComparator(Class).

Since JobConf.setOutputKeyComparatorClass(Class)can be used to control how intermediate

keys are grouped, these can be used in conjunction to simulate secondary sort on values.

8.1.2.3 Reduce

In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter)method is

called for each <key, (list of values)>pair in the grouped inputs.

The output of the reduce task is typically written to the File System via

OutputCollector.collect(WritableComparable, Writable).

Applications can use the Reporter to report progress, set application-level status messages and

update Counters, or just indicate that they are alive.

The output of the Reducer is not sorted.

8.1.2.4 How Many Reduces?

Page 21: Report Hadoop Map Reduce

MapReduce Page 21

The right number of reduces seems to be 0.95or 1.75multiplied by (<no. of nodes> *

mapred.tasktracker.reduce.tasks.maximum).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the

maps finish. With 1.75the faster nodes will finish their first round of reduces and launch a second

wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load

balancing and lowers the cost of failures.

The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in

the framework for speculative-tasks and failed tasks.

8.1.2.5 Reducer NONE

It is legal to set the number of reduce-tasks to zero if no reduction is desired.

In this case the outputs of the map-tasks go directly to the File System, into the output path set by

setOutputPath(Path). The framework does not sort the map-outputs before writing them out to

the File System.

8.1.3 Partitioner

Partitioner partitions the key space.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a

subset of the key) is used to derive the partition, typically by a hash function. The total number

of partitions is the same as the number of reduce tasks for the job. Hence this controls which of

the reduce tasks the intermediate key (and hence the record) is sent to for reduction.

HashPartitioner is the default partitioner.

8.1.4 Reporter

Reporter is a facility for MapReduce applications to report progress, set application-level status

messages and update Counters.

Mapper and Reducer implementations can use the Reporter to report progress or just indicate that

they are alive. In scenarios where the application takes a significant amount of time to process

individual key/value pairs, this is crucial since the framework might assume that the task has

timed-out and kill that task. Another way to avoid this is to set the configuration parameter

mapred.task.timeout to a high-enough value (or even set it to zero for no time-outs).

Applications can also update Counters using the Reporter.

Page 22: Report Hadoop Map Reduce

MapReduce Page 22

8.1.5 OutputCollector

OutputCollector is a generalization of the facility provided by the MapReduce framework to

collect data output by the Mapper or the Reducer (either the intermediate outputs or the output of

the job).

Hadoop MapReduce comes bundled with a library of generally useful mappers, reducers, and

partitioners.

9. Implementation

Many different implementations of the MapReduce interface are possible. The right choice

depends on the environment. For example, one implementation may be suitable for a small

shared-memory machine, another for a large NUMA multi-processor, and yet another for an

even larger collection of networked machines.

This section describes an implementation targeted to the computing environment in wide use at

Google: large clusters of commodity PCs connected together with switched Ethernet. In our

environment:

1. Machines are typically dual-processor x86 processors running Linux, with 2-4 GB of

memory per machine.

2. Commodity networking hardware is used – typically either 100 megabits/second or 1

gigabit/second at the machine level, but averaging considerably less in overall bisection

bandwidth.

3. A cluster consists of hundreds or thousands of machines, and therefore machine failures

are common.

4. Storage is provided by inexpensive IDE disks attached directly to individual machines. A

distributed file system developed in-house is used to manage the data stored on these

disks. The file system uses replication to provide availability and reliability on top of

unreliable hardware.

5. Users submit jobs to a scheduling system. Each job consists of a set of tasks, and is

mapped by the scheduler to a set of available machines within a cluster.

9.1 Execution Overview

The Map in vocations are distributed across multiple machines by automatically partitioning the

input data into a set of M splits. The input splits can be processed in parallel by different

machines. Reduce in vocations are distributed by partitioning the intermediate key space into R

pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and

the partitioning function are specified by the user .

Page 23: Report Hadoop Map Reduce

MapReduce Page 23

Figure shows the overall flow of a MapReduce operation in our implementation. When the user

program calls the MapReduce function, the following sequence of actions occurs (the numbered

labels in Figure 1 correspond to the numbers in the list below):

1. The MapReduce library in the user program first splits the input files into M pieces of

typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional

parameter). It then starts up many copies of the program on a cluster of machines.

2. One of the copies of the program is special – the master. The rest are workers that are assigned

work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle

workers and assigns each one a map task or a reduce task.

3. A worker who is assigned a map task reads the contents of the corresponding input split. It

parses key/value pairs out of the input data and passes each pair to the user-defined Map

function. The intermediate key/value pairs produced by the Map function are buffered in

memory.

Page 24: Report Hadoop Map Reduce

MapReduce Page 24

4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the

partitioning function. The locations of these buffered pairs on the local disk are passed back to

the master, who is responsible for forwarding these locations to the reduce workers.

5. When a reduce worker is notified by the master about these locations, it uses remote procedure

calls to read the buffered data from the local disks of the map workers. When a reduce worker

has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the

same key are grouped together . The sorting is needed because typically many different keys map

to the same reduce task. If the amount of intermediate data is too large to fit in memory, an

external sort is used.

6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate

key encountered, it passes the key and the corresponding set of intermediate values to the user’ s

Reduce function. The output of the Reduce function is appended to a final output file for this

reduce partition.

7. When all map tasks and reduce tasks have been completed, the master wakes up the user

program. At this point, the MapReduce call in the user program returns back to the user code.

After successful completion, the output of the mapreduce execution is available in the R output

files (one per reduce task, with file names as specified by the user). Typically , users do not need

to combine these R output files into one file – they often pass these files as input to another

MapReduce call, or use them from another distributed application that is able to deal with input

that is partitioned into multiple files.

Page 25: Report Hadoop Map Reduce

MapReduce Page 25

9.2 Master Data Structures

The master keeps several data structures. For each map task and reduce task, it stores the state

(idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks).

The master is the conduit through which the location of intermediate file regions is propagated

from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the

locations and sizes of the R intermediate file regions produced by the map task. Updates to this

location and size information are received as map tasks are completed. The information is

pushed incrementally to workers that have in-progress reduce tasks.

Page 26: Report Hadoop Map Reduce

MapReduce Page 26

9.3 Fault Tolerance

Since the MapReduce library is designed to help process very large amounts of data using

hundreds or thousands of machines, the library must tolerate machine failures gracefully.

9.3.1 Worker Failure

The master pings every worker periodically. If no response is received from a worker in a certain

amount of time, the master marks the worker as failed. Any map tasks completed by the worker

are reset back to their initial idle state, and therefore become eligible for scheduling on other

workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle

and becomes eligible for rescheduling.

Completed map tasks are re-executed on a failure because their output is stored on the local

disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to

be re-executed since their output is stored in a global file system.

When a map task is executed first by worker A and then later executed by worker B (because A

failed), all workers executing reduce tasks are notified of the re-execution. Any reduce task that

has not already read the data from worker A will read the data from worker B.

Page 27: Report Hadoop Map Reduce

MapReduce Page 27

MapReduce is resilient to large-scale worker failures. For example, during one MapReduce

operation, network maintenance on a running cluster was causing groups of 80 machines at a

time to become unreachable for several minutes. The MapReduce master simply re-executed the

work done by the unreachable worker machines, and continued to make forward progress,

eventually completing the MapReduce operation.

9.3.2 Master Failure

It is easy to make the master write periodic checkpoints of the master data structures described

above. If the master task dies, a new copy can be started from the last checkpointed state.

However, given that there is only a single master, its failure is unlikely; therefore our current

implementation aborts the MapReduce computation if the master fails. Clients can check for this

condition and retry the MapReduce operation if they desire.

9.3.3 Semantics in the Presence of Failures

When the user-supplied map and reduce operators are deterministic functions of their input

values, our distributed implementation produces the same output as would have been produced

by a non-faulting sequential execution of the entire program.

We rely on atomic commits of map and reduce task outputs to achieve this property . Each in-

progress task writes its output to private temporary files. A reduce task produces one such file,

and a map task produces R such files (one per reduce task). When a map task completes, the

worker sends a message to the master and includes the names of the R temporary files in the

message. If the master receives a completion message for an already completed map task, it

ignores the message. Otherwise, it records the names of R files in a master data structure.

When a reduce task completes, the reduce worker atomically renames its temporary output file to

the final output file. If the same reduce task is executed on multiple machines, multiple rename

calls will be executed for the same final output file. We rely on the atomic rename operation

provided by the underlying file system to guarantee that the final file system state contains just

the data produced by one execution of the reduce task.

The vast majority of our map and reduce operators are deterministic, and the fact that our

semantics are equivalent to a sequential execution in this case makes it very easy for

programmers to reason about their program’ s behavior . When the map and/or reduce operators

are nondeterministic, we provide weaker but still reasonable semantics. In the presence of non-

deterministic operators, the output of a particular reduce task R1 is equivalent to the output for

R1 produced by a sequential execution of the non-deterministic program. However, the output

for a different reduce task R2 may correspond to the output for R2 produced by a different

sequential execution of the non-deterministic program.

Page 28: Report Hadoop Map Reduce

MapReduce Page 28

Consider map task M and reduce tasks R1 and R2. Let e(Ri ) be the execution of Ri that

committed (there is exactly one such execution). The weaker semantics arise because e(R1) may

have read the output produced by one execution of M and e(R2) may have read the output

produced by a different execution of M .

9.3.4 Locality

Network bandwidth is a relatively scarce resource in our computing environment. We conserve

network bandwidth by taking advantage of the fact that the input data (managed by GFS [8]) is

stored on the local disks of the machines that make up our cluster . GFS divides each file into 64

MB blocks, and stores several copies of each block (typically 3 copies) on different machines.

The MapReduce master takes the location information of the input files into account and

attempts to schedule a map task on a machine that contains a replica of the corresponding input

data. Failing that, it attempts to schedule a map task near a replica of that task’ s input data (e.g.,

on a worker machine that is on the same network switch as the machine containing the data).

When running large MapReduce operations on a significant fraction of the workers in a cluster,

most input data is read locally and consumes no network bandwidth.

9.3.5 Task Granularity

We subdivide the map phase into M pieces and the reduce phase into R pieces, as described

above. Ideally, M and R should be much larger than the number of worker machines. Having

each worker perform many different tasks improves dynamic load balancing, and also speeds up

recovery when a worker fails: the many map tasks it has completed can be spread out across all

the other worker machines.

There are practical bounds on how large M and R can be in our implementation, since the master

must make O(M + R) scheduling decisions and keeps O(M ∗ R) state in memory as described

above. (The constant factors for memory usage are small however: the O(M ∗ R) piece of the

state consists of approximately one byte of data per map task/reduce task pair .)

Furthermore, R is often constrained by users because the output of each reduce task ends up in a

separate output file. In practice, we tend to choose M so that each individual task is roughly 16

MB to 64 MB of input data (so that the locality optimization described above is most effective),

and we make R a small multiple of the number of worker machines we expect to use. We often

perform MapReduce computations with M = 200, 000 and R = 5, 000, using 2,000 worker

machines.

9.3.6 Backup Tasks

One of the common causes that lengthens the total time taken for a MapReduce operation is a

“straggler”: a machine that takes an unusually long time to complete one of the last few map or

reduce tasks in the computation. Stragglers can arise for a whole host of reasons. For example, a

Page 29: Report Hadoop Map Reduce

MapReduce Page 29

machine with a bad disk may experience frequent correctable errors that slow its read

performance from 30 MB/s to 1 MB/s. The cluster scheduling system may have scheduled other

tasks on the machine, causing it to execute the MapReduce code more slowly due to competition

for CPU, memory, local disk, or network bandwidth. A recent problem we experienced was a

bug in machine initialization code that caused processor caches to be disabled: computations on

affected machines slowed down by over a factor of one hundred.

We have a general mechanism to alleviate the problem of stragglers. When a MapReduce

operation is close to completion, the master schedules backup executions of the remaining in-pr

ogress tasks. The task is marked as completed whenever either the primary or the backup

execution completes. We have tuned this mechanism so that it typically increases the

computational resources used by the operation by no more than a few percent. We have found

that this significantly reduces the time to complete large MapReduce operations. As an example,

the sort program described in Section 5.3 takes 44% longer to complete when the backup task

mechanism is disabled.

10. Disadvantages/Reasons for introducing new technologies

Following are the reasons which led to improvements in data processing technologies after

MapReduce:-

Real-time processing.

It's not always very easy to implement each and everything as a MR program.

When the intermediate processes need to talk to each other (jobs run in isolation).

When the processing requires lot of data to be shuffled over the network.

When we need to handle streaming data. MR is best suited to batch process huge amounts

of data which is already present.

When we can get the desired result with a standalone system. It's obviously less painful

to configure and manage a standalone system as compared to a distributed system.

When we have OLTP needs. MR is not suitable for a large number of short on-line

transactions.

Page 30: Report Hadoop Map Reduce

MapReduce Page 30

Conclusion-

Page 31: Report Hadoop Map Reduce

MapReduce Page 31

References-

1. https://www.hadoop.apache.org

2. https://en.wikipedia.org/wiki/MapReduce

3. C.A. Hansen, “Optimizing Hadoop for the cluster”, Institute for Computer Science,

University of Troms, Norway. [online]

4. J. Dean, S. Ghemawat, Mapreduce: Simplified data processing on large clusters,

Communications of the ACM, 51 (Jan 2008), pp. 107–113

5. S. Ghemawat, H. Gobioff, S.-T. Leung, The Google file system, 19th ACM Symposium

on Operating Systems Principles, Proceedings, ACM Press (2003), pp. 29–43