writing a mapreduce program 1. agenda how to use the hadoop api to write a mapreduce program in...

59
Writing a MapReduce Program 1

Upload: baldwin-baker

Post on 04-Jan-2016

223 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

1

Page 2: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

Agenda How to use the Hadoop API to write a MapReduce program in

Java How to use the Streaming API to write Mappers and Reducers

in other languages

2

Page 3: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

3

Page 4: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

A Sample MapReduce Program: Introduction

In the previous chapter, you ran a sample MapReduce program – WordCount, which counted the number of occurrences of each

unique word in a set of files

In this chapter, we will examine the code for a sample

MapReduce program – WordCount which counts the number of occurrences of each

unique word in a set of files

4

Page 5: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Components of a MapReduce Program

MapReduce programs generally consist of three portions – The Mapper

– The Reducer

– The driver code

We will look at each element in turn

Note: Your MapReduce program may also contain other elements

– Combiner (often the same code as the Reducer)

– Custom partitioner

– Etc

– We will investigate these other elements later in the course

5

Page 6: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

Examining our Sample MapReduce program

The Driver Code

The Mapper

The Reducer

The Streaming API

Hands-On Exercise: Write a MapReduce program

Conclusion

6

Page 7: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Driver: Complete Code

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

7

Page 8: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Driver: Import Statements

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

You will typically import these classes into everyMapReduce job you write. We will omit the importstatements in future slides for brevity

8

Page 9: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Driver: Main Code (cont’d)

9

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

9

Page 10: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Driver: Main Code

10

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

You usually configure your MapReduce job in the main method of your driver code. Here, we first check to ensure that the user has specified the HDFS directories to use for input and output on the command line.

Page 11: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Configure the Job

11

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

A Job object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster).

Page 12: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Creating a New JobConf Object

The JobConf class allows you to set configuration options for

your MapReduce job

– The classes to be used for your Mapper and Reducer

– The input and output directories

– Many other options

Any options not explicitly set in your driver code will be read

from your Hadoop configuration files

– Usually located in /etc/hadoop/conf

Any options not specified in your configuration files will receive

Hadoop’s default values

12

Page 13: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Naming The Job

13

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

First, we give our job a meaningful name. we can pass a class in the Job’s setJarByClass() method, which Hadoop will use to locate the relevant JAR file.

13

Page 14: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Specifying Input and Output Directories

14

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

Next, we specify the input directory from which data will be read, and the output directory to which our final output will be written

Page 15: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Determining Which Files To Read

By default, FileInputFormat.setInputPath() will read all

files from a specified directory and send them to Mappers

– Exceptions: items whose names begin with a period (.) or

underscore (_)

– Globs can be specified to restrict input

– For example, /2010/*/01/*

Alternatively, FileInputFormat.addInputPath() can be called

multiple times, specifying a single file or directory each time

More advanced filtering can be performed by implementing a

PathFilter (see later)

15

Page 16: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Getting Data to the Mapper

The data passed to the Mapper is specified by an InputFormat

– Specified in the driver code

– Defines the location of the input data

– A file or directory, for example

– Determines how to split the input data into input splits

– Each Mapper deals with a single input split

– InputFormat is a factory for RecordReader objects to extract

(key, value) records from the input source

16

Page 17: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Getting Data to the Mapper (cont’d)

17

Page 18: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Some Standard InputFormats

FileInputFormat

– The base class used for all file-based InputFormats

TextInputFormat

– The default

– Treats each \n-terminated line of a file as a value

– Key is the byte offset within the file of that line

KeyValueTextInputFormat

– Maps \n-terminated lines as ‘key SEP value’

– By default, separator is a tab

SequenceFileInputFormat

– Binary file of (key, value) pairs with some additional metadata

SequenceFileAsTextInputFormat

– Similar, but maps (key.toString(), value.toString())

J

18

Page 19: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Specifying Final Output With OutputFormat

FileOutputFormat.setOutputPath() specifies the directory

to which the Reducers will write their final output

The driver can also specify the format of the output data

– Default is a plain text file

– Could be explicitly written as

job.setOutputFormat(TextOutputFormat.class);

We will discuss OutputFormats in more depth in a later chapter.

19

Page 20: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Specify The Classes for Mapper and Reducer

20

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

Give the Job object information about which classes are to be instantiated as the Mapper and Reducer. You also specify the classes for the intermediate and final keys and values

Page 21: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Running The Job

21

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

Finally submit the job to the cluster and wait for it to finish.

Page 22: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Reprise: Driver Code

public class WordCountDriver {

public static void main(String[] args) throws Exception ,

InterruptedException, ClassNotFoundException {

if (args.length != 2) {

System.out.println("usage: [input] [output]");

System.exit(-1);

}

Configuration conf = new Configuration();

Job job = new Job (conf, "wordcount”);

job.setJarByClass(WordCountDriver.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true));

}

}

Page 23: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

23

Page 24: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Mapper: Complete Code

24

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Break line into words for processing String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); }} }

Page 25: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Mapper: import Statements

25

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Break line into words for processing String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); }}

You will typically import java.io.IOException, and the org.apache.hadoop classes shown, in everyMapper you write. We have also imported java.util.StringTokenizer as we will need this for our particular Mapper. We will omit the import statements in future slides for brevity.

Page 26: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Mapper: Main Code

26

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

Page 27: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Mapper: Main Code (cont’d)

27

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

The Mapper class is generic type with four formal parameters .The first two parameters define the input key and value types, the second two define the output key and value types.In our example, because we are going to ignore theinput key we can just specify that it will be an Object

– we do not need to be more specific than that.

Page 28: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Mapper Parameters

28

The Mapper’s parameters define the input and output key/value

types Keys are always WritableComparable Values are always Writable

Page 29: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

What is Writable?

Hadoop defines its own ‘box classes’ for strings, integers and so

on

– IntWritable for ints

– LongWritable for longs

– FloatWritable for floats

– DoubleWritable for doubles

– Text for strings

– Etc.

The Writable interface makes serialization quick and easy for

Hadoop

Any value’s type must implement the Writable interface

29

Page 30: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

What is WritableComparable?

A WritableComparable is a Writable which is also

Comparable

– Two WritableComparables can be compared against each

other to determine their ‘order’

– Keys must be WritableComparables because they are passed

to the Reducer in sorted order

– We will talk more about WritableComparable later

Note that despite their names, all Hadoop box classes implement

both Writable and WritableComparable

– For example, IntWritable is actually a

WritableComparable

30

Page 31: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Creating Objects: Efficiency

31

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

We create two objects outside of the map function weare about to write. One is a Text object, the other anIntWritable. We do this here for efficiency, as we will show.

Page 32: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Using Objects Efficiently in MapReduce

A typical way to write Java code might look something like this:

Problem: this creates a new object each time around the loop

Your map function will probably be called many thousands or

millions of times for each Map task

– Resulting in millions of objects being created This is very inefficient

– Running the garbage collector takes time

32

while (more input exists) {myIntermediate = new intermediate(input);myIntermediate.doSomethingUseful();export outputs;}

Page 33: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Using Objects Efficiently in MapReduce (cont’d)

A more efficient way to code:

Only one object is created

– It is populated with different data each time around the loop

– Note that for this to work, the intermediate class must be

mutable

– All relevant Hadoop classes are mutable

33

myIntermediate = new intermediate(junk);while (more input exists) {myIntermediate.setupState(input);myIntermediate.doSomethingUseful();export outputs;}

Page 34: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Using Objects Efficiently in MapReduce (cont’d)

Reusing objects allows for much better cache usage

– Provides a significant performance benefit (up to 2x)

All keys and values given to you by Hadoop use this model

Caution! You must take this into account when you write your

code

– For example ,if you create a list of all the objects passed

to your must be sure to do a deep copy

34

Page 35: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The map Method

35

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

The map method’s signature looks like this. It will bepassed a key and a value. The map() method also provides an instance of Context to write the output to.

Page 36: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The map Method (cont’d)

One instance of your Mapper is instantiated per task attempt

– This exists in a separate process from any other Mapper

– Usually on a different physical machine

– No data sharing between Mappers is allowed!

36

Page 37: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The map Method: Processing The Line

37

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

Within the map method , we split each line into separate words using Java’s StringTokenizer class.

Page 38: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Outputting Intermediate Data

38

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

To emit a (key, value) pair, Emit output with Context.write().The key will be the word itself, the value will be the number 1. Recall that the output key must be of type WritableComparable, and the value must be a Writable. We have created a Text object called word, so we can simply put a new value into that object. We have also created an IntWritable object called one.

Page 39: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Reprise: The Map Method

39

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Text word = new Text();

private final static IntWritable one = new IntWritable(1);

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// Break line into words for processing

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

Page 40: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

40

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

Page 41: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Reducer: Complete Code

41

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

Page 42: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Reducer: Import Statements

42

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

As with the Mapper, you will typically importjava.io.IOException, and the org.apache.hadoopclasses shown, in every Mapper you write.We will omit the import statements in future slides forbrevity

Page 43: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Reducer: Main Code

43

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

Page 44: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Reducer: Main Code (cont’d)

44

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

Your Reducer class should extend ReducerThe Reducer class expects four parameters, which define the types of the input and output key/value pairs. The first two parameters define the intermediate key and value types, the second two define the final output key and value types. The keys are WritableComparables,the values are Writables.

Page 45: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The reduce Method

45

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

The input types of the reduce function must match the output types of the map function: Text and IntWritable

Page 46: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Processing The Values

46

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

We use the IntWritable class and get() methodson values to step through all the elements in theiterator. In our example, we are merely adding all thevalues together.

Page 47: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing The Final Output

47

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

Finally, we place the total into our sum objectand write the output (key, value) pair using the write() of our Context object.

Page 48: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Reprise: The Reduce Method

48

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

Page 49: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

49

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

Page 50: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Streaming API: Motivation

Many organizations have developers skilled in languages other

than Java

– Perl

– Ruby

– Python

– Etc The Streaming API allows developers to use any language they

wish to write Mappers and Reducers

– As long as the language can read from standard input and

write to standard output

50

Page 51: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

The Streaming API: Advantages

Advantages of the Streaming API:

– No need for non-Java coders to learn Java

– Fast development time

– Ability to use existing code libraries

51

Page 52: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

How Streaming Works

To implement streaming, write separate Mapper and Reducer

programs in the language of your choice

– They will receive input via stdin

– They should write their output to stdout Input format is key (tab) value Output format should be written as key (tab) value Separators other than tab can be specified

52

Page 53: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Streaming: Example Mapper

Example Mapper: map (k, v) to (v, k)

53

#!/usr/bin/env python2.5 import sys while True: line = sys.stdin.readline() if len(line) == 0: break (k, v) = line.strip().split(“\t”) print v + “\t” + k

Page 54: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Streaming Reducers: Caution

54

Recall that in Java, all the values associated with a key are

passed to the Reducer as an Iterator

Using Hadoop Streaming, the Reducer receives its input as (key,

value) pairs

– One per line of standard input

Your code will have to keep track of the key so that it can detect

when values from a new key start appearing

Page 55: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Launching a Streaming Job

To launch a Streaming job, use e.g.,:

Many other command-line options are available Note that system commands can be used as a Streaming mapper

or reducer

– awk, grep, sed, wc etc can be used

55

hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper myMapScript.pl \ -reducer myReduceScript.pl

Page 56: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

56

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

Page 57: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Hands-On Exercise: Write A MapReduce Program

In this Hands-On Exercise, you will write a MapReduce program

using either Java or Hadoop’s Streaming interface Please refer to the PDF of exercise instructions which you

downloaded along with the course notes. The PDF can also be

found on the Desktop of the training Virtual Machine

57

Page 58: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Writing a MapReduce Program

58

Examining our Sample MapReduce program The Driver Code The Mapper The Reducer The Streaming API Hands-On Exercise: Write a MapReduce program Conclusion

Page 59: Writing a MapReduce Program 1. Agenda  How to use the Hadoop API to write a MapReduce program in Java  How to use the Streaming API to write Mappers

Conclusion

In this chapter you have learned How to use the Hadoop API to write a MapReduce program in

Java How to use the Streaming API to write Mappers and Reducers in

other languages

59