hadoop mapreduce types: 2/2 - fordham...

35
Hadoop MapReduce Types: 2/2 Spring 2015, X. Zhang Fordham Univ.

Upload: doantu

Post on 06-Feb-2018

217 views

Category:

Documents


1 download

TRANSCRIPT

Page 1: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Hadoop MapReduce Types: 2/2

Spring 2015, X. Zhang

Fordham Univ.

Page 2: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Outline

• MapReduce: how to control number of tasks • InputFormat class decides # of splits => number

of map tasks • reduce tasks # configured by client

• ChainMapper: modular design • Input processing: • XML file processing • whole file as a record

• Binary output & sorting

Page 3: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

MapReduce Programming

3

Output: a set of [key,value] pairs

Split

intermediate [key,value] pairs

[k,(v1,v2, …)] [k’,(v1,v2,…)] …

Shuffle

File (text, binary) Database

Input: a set of [key,value] pairs

InputFormat

OutputFormat

Partition: hash key to reducer task

Page 4: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Usage of InputFormat

• An InputFormat is responsible for creating input splits and dividing them into records:

public abstract class InputFormat<K, V> {

public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

!public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

}

• Default input format is TextInputFormat, which produces one (key,value) pair for each line in the text file

• We will later look at a customized InputFormat class …

4

Page 5: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

InputSplitInput split is a chunk of the input that is processed by a single map.

Each split is divided into records, and map processes each record—a key-value pair—in turn.

!public abstract class InputSplit {

public abstract long getLength() throws IOException, InterruptedException;

public abstract String[] getLocations() throws IOException,InterruptedException;

}

An InputSplit has a length in bytes and a set of storage locations, host- name strings.

5

Page 6: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Starting map tasks1. Client running job calculates the splits for the job by calling getSplits(),

2. Client sends splits to jobtracker/ResourceManager, which uses their storage locations to schedule map tasks to process them

3. Each map task passes split to createRecordReader() method on InputFormat to obtain a RecordReader for that split.

• A RecordReader is little more than an iterator over records,

• Map task uses RecordReader to generate key-value pairs, and passes them to map function.

public void run(Context context) throws IOException, InterruptedException {

setup(context); while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

default implementation of Mapper class’s run function 6

Page 7: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Split and Record

• Sometimes a record span two blocks/input splits: • map task #1: located on same node as first

block/split of file • needs to perform remote read to obtain record 5

(which spans two blocks)

7

Page 8: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

InputFormat Hierarchy

8

Page 9: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

FileInputFormat

9

Parent class for all that read from files Input to a job: a collection of paths:

void addInputPath(…) void addInputPaths(…)

void setInputPaths(…) void setInputPaths(…)

Page 10: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Split Size FileInputFormat splits only large files. Here “large” means larger than an HDFS block.

!!!!!!!by default, minimumSize < blockSize < MaximumSize

Formula for split size:

max (minimumSize, min (maxmumSize, blockSize)) 10

Page 11: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Demo: How to? • Change MaximumSplit size so that a file smaller than BlockSize is

splitter?

• ~/hadoop_sampleCodes/citibike/ShellScript/RunDefaultJob.sh

• -dmapred.max.split.size=20000

!• Try ./RunDefaultJob.sh >& dd

• grep 2014-04.csv dd | wc -l => 655 map tasks 2015-03-30 15:38:40,425 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(245)) - Total input paths to process : 1 2015-03-30 15:38:40,496 INFO [main] mapreduce.JobSubmitter

(JobSubmitter.java:submitJobInternal(371)) - number of splits:6552015-03-30 15:25:49,678 INFO [Thread-2] mapred.Merger (Merger.java:merge(568)) - Merging 655 sorted segments

11

Page 12: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Demo: How to configure number of reducers?

12

• Do not work in local mode

• See LocalJobRunner.java code: sequentially run the map tasks, and then start one reduce tasks

• in command line: -Dmapred.reduce.tasks=2

• in code: job.setNumreduceTasks(2);

• ~/hadoop_sampleCodes/citibike/ShellScript/RunDefaultJob.sh

• -dmapred.max.split.size=20000

!• RunDefaultJob_pseudo.sh

Page 13: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Shuffling

13

Output: a set of [key,value] pairs

Split

intermediate [key,value] pairs

[k,(v1,v2, …)] [k’,(v1,v2,…)] …

Shuffle

File (text, binary) Database

Input: a set of [key,value] pairs

InputFormat

OutputFormat

Partition: hash key to reducer task

Page 14: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Which reduce task?

• For each (K2,V2), intermediate key-value pair, which reduce task to go to? • Partition the whole domain of K2 into multiple partitions • each reduce tasks process one partition

• partition function operates on intermediate key and value types (K2 and V2), and returns partition index. • In practice, partition is determined solely by key (value is

ignored):

• Default partitioner is HashPartioner

public class HashPartitioner<K, V> extends Partitioner<K, V> {

public int getPartition(K key, V value, int numReduceTasks) {

return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }

}

14

Page 15: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Outline

• MapReduce: how to control number of tasks • InputFormat class decides # of splits => number

of map tasks • reduce tasks # configured by client

• ChainMapper: modular design • Input processing: • XML file processing • whole file as a record

• Binary output & sorting

Page 16: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

ChainMapper class• ChainMapper class: use multiple Mapper classes within a single Map task.

mapper1 => mapper2 => mapper3 …=>lastmapper

• output of the first becomes the input of the second, and so on until the last Mapper

• output of the last Mapper will be written to the task's output.

• Benefit:

• Modularity (simple and reusable specialized Mappers)

• Composibility (mapper can combined to perform composite operations)

• reduction in disk IO: compared to multiple “chained” map reduce jobs

16

Page 17: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

a word count jobJob job = Job.getInstance(); !Configuration splitMapConfig = new Configuration(false); ChainMapper.addMapper(job, SplitMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, splitMapConfig); !Configuration lowerCaseMapConfig = new Configuration(false); ChainMapper.addMapper(job, LowerCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, lowerCaseMapConfig); ! job.setJarByClass(ChainMapperDriver.class); job.setCombinerClass(ChainMapReducer.class); job.setReducerClass(ChainMapReducer.class); 17

Page 18: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

forming a chain of mapper public static <K1,V1,K2,V2> void addMapper(JobConf job," Class<? extends Mapper<K1,V1,K2,V2>> klass," Class<? extends K1> inputKeyClass," Class<? extends V1> inputValueClass," Class<? extends K2> outputKeyClass," Class<? extends V2> outputValueClass," boolean byValue," JobConf mapperConf)"Adds a Mapper class to the chain job's JobConf. !byValue - indicates if key/values should be passed by value to the next Mapper in the chain (if any) or by reference. If a Mapper leverages the assumed semantics that the key and values are not modified by the collector 'by value' must be used. If the Mapper does not expect this semantics, as an optimization to avoid serialization and deserialization 'by reference' can be used. !MPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain

18

Page 19: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Outline

• MapReduce: how to control number of tasks • InputFormat class decides # of splits => number

of map tasks • reduce tasks # configured by client

• ChainMapper: modular design • Input processing: • XML file processing • whole file as a record

• Binary output & sorting

Page 20: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

SequenceFile

• SequenceFile: provides a persistent data structure for binary key-value pairs

• keys and values stored in a SequenceFile do not necessarily need to be Writable. Any types that can be serialized and deserialized by a Serialization may be used.

• convert an object/value to/from a byte stream,

• In contrast, default TextOutputFormat

• writes key, value by calling toString() method on them

• convert an object/value to/from a stream of text (e.g., ASCII)

20

Page 21: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

MapFile

• A file-based map from keys to values. A map is a directory containing two files

• data file, containing all keys and values in the map

• index file, containing a fraction of the keys. The fraction is determined by MapFile.Writer.getIndexInterval().

• Index file is read entirely into memory. Thus key implementations should try to keep themselves small.

• Allowing for quick lookup of record

• Exercise: run the MapReduce job that uses MapFileOutputFormat class, examine the output directory…

21

Page 22: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Sorting in MapReduce Framework

22

Page 23: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

All about sorting

• Partial sort • “comes free”, see previous slides • output from map => partitioned,

sorted, and merged (sorted) => reduce • each partition is sorted, i.e., each

reduce task output is sorted • To sort globally (total sort) • one partition, i.e., one reduce task • or, use customized partition class

23

Page 24: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

All about sorting

• Total sort: outputs of reduce tasks can be concatenated together to get a sorted output

• Ideas: • Use TotalOrderPartition class, • if k1<k2, partition(k1)<partition(k2)

• How to make sure the partitioning is balanced, to minimize running time? • use InputSampler to sample output to

get an estimated distribution of keys 24

Page 25: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Secondary sort

• Goal: sort output by year, and then within the year, sort by temperature 1901 120 1901 123 … 1902 89 1902 111 …

• You can do this by writing a reduce class • intermediate key-value pairs are grouped: reduce (k, <v1,v2,

…>), but v1,v2, … are not sorted … • Or we can again take advantage of MapReduce’s framework

(how it already partitions, sorts, groups data for us) 25

Page 26: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

How?

• Goal: sort output by year, and then within the year, sort by temperature

• Plan: • Use year and temperature as key (a composite

key) • Partition and group based on year only • so that records of same year are sent to same

reduce tasks, and grouped together in a list • Sort based on the composite key (year and

temperature • so that the ordering of records (within same

group) are ordered by temperature… 26

Page 27: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Details

• PartitionerClass: FirstPartitioner class, uses only first part (e.g., year) of composite key

27

Page 28: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Details

• setGroupingComparatorClass Define the comparator that controls which keys are grouped together for a single call to Reducer.reduce function"• Use GroupComparator which just comparing first part of

key (e.g., year)

28

Page 29: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Details

• setSortComparatorClass: "

" Define the comparator that controls how the keys are sorted before they are passed to the Reducer."• We use KeyCompartor class, sort by first part (e.g., year), and

then by second part of composite key (e.g., temperature)

29

SortComparator

1900 34 1900 34 1900 34 … !1901 35 1901 36

Page 30: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Outline

• MapReduce: how to control number of tasks • InputFormat class decides # of splits => number of

map tasks • reduce tasks # configured by client

• ChainMapper: modular design • Input processing: • XML file processing • whole file as a record

• Binary output & sorting • Join

Page 31: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Join

31

Combine two datasets together using a key !Here, use StationID…

Page 32: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

An example of reduce-side joins

• Multiple inputs, of different formats • e.g., one station record, another

weather data • mapper class: tag record with

composite key, e.g., station_id-0 for station record, station_id-1 for weather record

• Secondary sort: • use first part of composite key to

partition and group 32

Page 33: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Commonly Used Mapper/Reducer

33

Page 34: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

Debugging

• MapReduce job and task logs

34

Page 35: Hadoop MapReduce Types: 2/2 - Fordham Universitystorm.cis.fordham.edu/zhang/cs5950/slides/MapReduceType2.pdf · • MapReduce: how to control number of tasks • InputFormat class

User-level logs

35