hadoop - introduction to map reduce programming - reunião 12/04/2014

37
Java MapReduce Programming on Apache Hadoop Aaron T. Myers, aka ATM with thanks to Sandy Ryza

Upload: soujavajug

Post on 27-Jan-2015

106 views

Category:

Technology


1 download

DESCRIPTION

 

TRANSCRIPT

Page 1: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Java MapReduce Programming on Apache HadoopAaron T. Myers, aka ATM

with thanks to Sandy Ryza

Page 2: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Introductions

● Software Engineer/Tech Lead for HDFS at Cloudera

● Committer/PMC Member on the Apache Hadoop project

● My work focuses primarily on HDFS and Hadoop security

Page 3: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is MapReduce?

● A distributed programming paradigm

Page 4: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is a distributed programming paradigm?

Help!

Page 5: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is a distributed programming paradigm?

Page 6: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Distributed Systems are Hard

● Monitoring● RPC protocols, serialization● Fault tolerance● Deployment● Scheduling/Resource Management

Page 7: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Writing Data Parallel Programs Should Not Be

Page 8: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

MapReduce to the Rescue

● You specify map(...) and reduce(...) functions○ map = (list(k, v) -> list(k, v))○ reduce = (k, list(v) -> k, v)

● The framework does the rest○ Split up the data○ Run several mappers over the splits○ Shuffle the data around for the reducers○ Run several reducers○ Store the final results

Page 9: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Map

apple apple banana

a happy airplane

airplane on the runway

runway apple runway

rumple on the apple

apple apple banana

a happy airplane

airplane on the runway

runway apple runway

rumple on the apple

apple - 1apple - 1banana - 1a - 1happy - 1airplane - 1on - 1the - 1runway - 1

runway - 1runway - 1apple - 1rumple - 1on - 1the - 1apple - 1

map()

map()

map()

map()

map()

Map Inputs Map OutputsInput Data Map Function Shuffle

Page 10: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Reduce

reduce()

reduce()

reduce()

reduce()

reduce()

reduce()

reduce()

reduce()

a - 1airplane - 1apple - 4

banana - 1on - 2

runway - 3rumple - 1the - 2

a - 1, 1

airplane - 1

apple - 1, 1, 1, 1

banana - 1

on - 1, 1

runway - 1, 1, 1

rumple - 1

the - 1, 1

Shuffle Reduce Output

Page 11: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is (Core) Hadoop?

● An open source platform for storing, processing, and analyzing enormous amounts of data

● Consists of…○ A distributed file system (HDFS)○ An implementation of the Map/Reduce paradigm

(Hadoop MapReduce)● Written in Java!

Page 12: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is Hadoop?

Traditional Operating System

Storage:File System

Execution/Scheduling:Processes

Page 13: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

What is Hadoop?

Hadoop(Distributed operating system)

Storage:Hadoop Distributed File System (HDFS)

Execution/Scheduling:MapReduce

Page 14: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

HDFS (briefly)

● Distributed file system that runs on all nodes in the cluster○ Co-located with Hadoop MapReduce daemons

● Looks like a pretty normal Unix file system○ hadoop fs -ls /user/atm/○ hadoop fs -cp /user/atm/data.txt /user/atm/data2.txt○ hadoop fs -rm /user/atm/data.txt○ …

● Don’t use the normal Java File API○ Instead use org.apache.hadoop.fs.FileSystem API

Page 15: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Writing MapReduce programs in Java

● Interface to MapReduce in Hadoop is Java API

● WordCount!

Page 16: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Word Count Map Function public class WordCountMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException {

String line = value.toString();

StringTokenizer itr = new StringTokenizer(line);

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

output.collect(word, one);

}

}

}

Page 17: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Word Count Reduce Function public static class WordCountReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

Page 18: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Word Count Driver

Page 19: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

InputFormats

● TextInputFormat○ Each line becomes <LongWritable, Text> = <byte

offset in file, whole line>● KeyValueTextInputFormat

○ Splits lines on delimiter into Text key and Text value● SequenceFileInputFormat

○ Reads key/value pairs from SequenceFile, a Hadoop format

● DBInputFormat○ Uses JDBC to connect to a database

● Many more, or write your own!

Page 20: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Serialization

● Writables○ Native to Hadoop○ Implement serialization for higher level structures

yourself● Avro

○ Extensible○ Cross-language○ Handles serialization of higher level structures for

you● And others…

○ Parquet, Thrift, etc.

Page 21: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Writablespublic class MyNumberAndStringWritable implements Writable {

private int number;

private String str;

public void write(DataOutput out) throws IOException {

out.writeInt(number);

out.writeUTF(str);

}

public void readFields(DataInput in) throws IOException {

number = in.readInt();

str = in.readUTF();

}

}

Page 22: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Avroprotocol MyMapReduceObjects {

record MyNumberAndString {

string str;

int number;

}

}

Page 23: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Testing MapReduce Programs

● First, write unit tests (duh) with MRUnit

● LocalJobRunner○ Runs job in single process

● Single-node cluster (Cloudera VM!)○ Multiple processes on the same machine

● On the real cluster

Page 24: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

MRUnit @Test

public void testMapper() throws IOException {

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver =

new MapDriver<LongWritable, Text, Text, IntWritable>(new WordCountMapper());

String line = "apple banana banana carrot";

mapDriver.withInput(new LongWritable(0), new Text(line));

mapDriver.withOutput(new Text("apple"), new IntWritable(1));

mapDriver.withOutput(new Text("banana"), new IntWritable(1));

mapDriver.withOutput(new Text("banana"), new IntWritable(1));

mapDriver.withOutput(new Text("carrot"), new IntWritable(1));

mapDriver.runTest();

}

Page 25: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

MRUnit @Test public void testReducer() { ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver = new MapDriver<Text, IntWritable, Text, IntWritable>(new WordCountReducer());

reduceDriver.withInput(new Text("apple"),

Arrays.asList(new IntWritable(1), new IntWritable(2)));

reduceDriver.withOutput(new Text("apple"), new IntWritable("3")); reduceDriver.runTest(); }

Page 26: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

CountersMap-Reduce Framework

Map input records=183

Map output records=183

Map output bytes=533563

Map output materialized bytes=534190

Input split bytes=144

Combine input records=0

Combine output records=0

Reduce input groups=183

Reduce shuffle bytes=0

Reduce input records=183

Reduce output records=183

Spilled Records=366

Shuffled Maps =0

Failed Shuffles=0

Merged Map outputs=0

GC time elapsed (ms)=7

CPU time spent (ms)=0

Physical memory (bytes) snapshot=0

Virtual memory (bytes) snapshot=0

File System Counters

FILE: Number of bytes read=1844866

FILE: Number of bytes written=1927344

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

File Input Format Counters

Bytes Read=655137

File Output Format Counters

Bytes Written=537484

Page 27: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Counters

if (record.isUgly()) {

context.getCounter("Ugly Record Counters",

"Ugly Records").increment(1);

}

Page 28: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

CountersMap-Reduce Framework

Map input records=183

Map output records=183

Map output bytes=533563

Map output materialized bytes=534190

Input split bytes=144

Combine input records=0

Combine output records=0

Reduce input groups=183

Reduce shuffle bytes=0

Reduce input records=183

Reduce output records=183

Spilled Records=366

Shuffled Maps =0

Failed Shuffles=0

Merged Map outputs=0

GC time elapsed (ms)=7

CPU time spent (ms)=0

Physical memory (bytes) snapshot=0

Virtual memory (bytes) snapshot=0

File System Counters

FILE: Number of bytes read=1844866

FILE: Number of bytes written=1927344

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

File Input Format Counters

Bytes Read=655137

File Output Format Counters

Bytes Written=537484

Ugly Record CountersUgly Records=1024

Page 29: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Distributed Cache

We need some data and libraries on all the nodes.

Page 30: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Distributed Cache

Map or Reduce Task

Map or Reduce Task

Local Copy

HDFS

Distributed CacheMap or

Reduce Task

Map or Reduce Task

Local Copy

Page 31: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Distributed Cache

In our driver:DistributedCache.addCacheFile(

new URI("/some/path/to/ourfile.txt" ), conf);

In our mapper or reducer:@Override

public void setup(Context context) throws IOException,

InterruptedException {

Configuration conf = context.getConfiguration();

localFiles = DistributedCache .getLocalCacheFiles(conf);

}

Page 32: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Java Technologies Built on MapReduce

Page 33: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Crunch

● Library on top of MapReduce that makes it easy to write pipelines of jobs in Java

● Contains capabilities like joins and aggregation functions to save programmers from writing these for each job

Page 34: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Crunchpublic class WordCount {

public static void main(String[] args) throws Exception {

Pipeline pipeline = new MRPipeline(WordCount.class);

PCollection<String> lines = pipeline.readTextFile(args[0]);

PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() {

public void process(String line, Emitter<String> emitter) {

for (String word : line.split("\\s+")) {

emitter.emit(word);

}

}

}, Writables.strings());

PTable<String, Long> counts = Aggregate.count(words);

pipeline.writeTextFile(counts, args[1]);

pipeline.run();

}

}

Page 35: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Mahout

● Machine Learning on Hadoop○ Collaborative Filtering○ User and Item based recommenders○ K-Means, Fuzzy K-Means clustering○ Dirichlet process clustering○ Latent Dirichlet Allocation○ Singular value decomposition○ Parallel Frequent Pattern mining○ Complementary Naive Bayes classifier○ Random forest decision tree based classifier

Page 36: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Non-Java technologies that use MapReduce

● Hive○ SQL -> M/R translator, metadata manager

● Pig○ Scripting DSL -> M/R translator

● Distcp○ HDFS tool to bulk copy data from one HDFS cluster

to another

Page 37: Hadoop - Introduction to map reduce programming - Reunião 12/04/2014

Thanks!

● Questions?