apache hadoop india summit 2011 talk "hadoop map-reduce programming & best practices"...

28
1 Map-Reduce Programming & Best Practices Apache Hadoop India Summit 2011 Basant Verma Yahoo! India R&D February 16, 2011

Upload: yahoo-developer-network

Post on 14-Dec-2014

2.384 views

Category:

Documents


0 download

DESCRIPTION

 

TRANSCRIPT

Page 1: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

1

Map-Reduce Programming & Best Practices

Apache Hadoop India Summit 2011

Basant Verma

Yahoo! India R&D

February 16, 2011

Page 2: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

2

Hadoop Components

• HDFS (Hadoop Distributed File System)– Modeled on GFS– Reliable, High Bandwidth file system that can

store TB' and PB's data.

• Map-Reduce– Using Map/Reduce metaphor from Lisp

language– A distributed processing framework paradigm

that process the data stored onto HDFS in key-value. DFS

Processing Framework

Client 1 Client 2

Page 3: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Word Count DataFlow

Page 4: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

4

Word Count

$ cat ~/wikipedia.txt | \

sed -e 's/ /\n/g' | grep . | \

sort | \

uniq -c > \

~/frequencies.txt

Page 5: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

5

MR for Word-Count

mapper (filename, file-contents):

for each word in file-contents:

emit (word, 1)

reducer (word, values[]):

sum = 0

for each value in values:

sum = sum + value

emit (word, sum)

Page 6: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

6

MR Dataflow

Page 7: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

7

MapReduce Pipeline

Page 8: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

8

Pipeline Details

Page 9: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Available tweaks and optimizations!

• Input to Maps• Map only jobs• Combiner• Compression• Speculation• Fault Tolerance• Buffer Size• Parallelism (threads) • Partitioner• Reporter• DistributedCache • Task child environment settings

Page 10: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

10

Input to Map

• Maps should process significant amount of data to minimize the effect of overhead.– Process multiple-files per map for jobs with very large

number of small input files.– Process large chunk of data for large scale processing

• Use as fewer maps to process data in parallel, as few as possible without having bad failure recovery cases.– Unless the application's maps are heavily CPU bound, there is

almost no reason to ever require more than 60,000-70,000 maps for a single application.

Page 11: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Map only jobs

• Run map only job once for generating data

• Run multiple jobs with different reduce implementations

• Map only jobs will write directly to HDFS

Page 12: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Combiner

• Provides map-side aggregation of data– Each and every record emitted by the Mapper need

not be shipped to the reducers.

• Reduce code can be used as combiner. Example : Word count!

• Helps reduce network traffic for the shuffle. Results in lesser disk space usage.

• However, it is important to ensure that– Ensure they really work – the Combiner does provide sufficient aggregation.

Page 13: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Compression

• Map and Reduce outputs can be compressed

• Compressing intermediate data will help reduce the amount of disk usage and network I/O.

• Compression helps reduce the total data size on the DFS.

Page 14: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

14

Shuffle

• Shuffle Phase performance depends on the crossbar between the map tasks and the reduce tasks, which must be minimized.–Compression of intermediate output–Use of Combiner

Page 15: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Reduces

• Configure appropriate number of reduces– Too few hurt the nodes– Too many hurt the cross-bar

• All reduces must be complete in single wave.

• Each reduce should process at least 1-2 GB of data, and at most 5-10GB of data, in most scenarios.

Page 16: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Partitioner

• Distribute data evenly across reduces– Uneven distribution will hurt the whole job

runtime.

• Default is hash partitioner– hash(key)%num-reducers

• Why is a custom partitioner needed? – Sort– WordCount

Page 17: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Output

• Outputs to a few large files, with each file spanning multiple HDFS blocks and appropriately compressed.

• Number of output artifacts is linearly proportionate to the number of configured reduces– Compress Outputs

• Use appropriate file-formats for output– E.g. compressed text file is not a great idea if not using

splittable codec.

• Consider using Hadoop ARchive (HAR) to reduce namespace usage.

• Think of the consumers of your data-set

Page 18: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Speculation

• Slow running tasks can be speculated• Slowness is determined by the expected

time the task will take to complete. • Speculation will kick-in only when there

are no pending tasks.• Total number of tasks that can be

speculated for a job is capped to reduce wastage.

Page 19: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Fault Tolerance

• Data is stored as blocks on separate nodes• Nodes are composed of cheap commodity

hardware• Tasks are independent of each other• New tasks can be scheduled on new nodes• The JobTracker tries 4 times (default) before

giving up. • Job can be configured to tolerate task failures

up to N% of the total tasks.

Page 20: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Reporter

• Used to report progress to the parent processes.

• Commonly used when the tasks try to

- Connect to a remote application like web-service, database

- Do some disk intensive computation

- Get blocked on some event• One can also spawn a thread and make it report the progress periodically

Page 21: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Distributed Cache

• Efficient distribution of read-only files for applications– Localized automatically once the task is scheduled

on the slave node– Cleaned up once no task running on the slave

needs the cache files

• Designed for small number of mid-size files.• Artifacts in the distributed-cache should not

require more i/o than the actual input to the application tasks.

Page 22: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Few tips for better performance

• Increase the memory/buffer allocated to the tasks (io.sort.mb)?• Increase the number of tasks that can be run in parallel• Increase the number of threads that serve the map outputs• Disable unnecessary logging• Find the optimal value of dfs block size• Share the cluster between the DFS and MR for data locality• Turn on speculation• Run reducers in one wave as they can be really costly• Make proper use of DistributedCache

Page 23: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Anti-Patterns

• Processing thousands of small files (sized less than 1 HDFS block, typically 128MB) with one map processing a single small file.

• Processing very large data-sets with small HDFS block size i.e. 128MB resulting in tens of thousands of maps.

• Applications with a large number (thousands) of maps with a very small runtime (e.g. 5s).

• Straight-forward aggregations without the use of the Combiner. • Applications with greater than 60,000-70,000 maps. • Applications processing large data-sets with very few reduces (such

as1). – Applications using a single reduce for total-order amount the output

records.– Pig scripts processing large data-sets without using the PARALLEL

keyword

Page 24: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Anti-Patterns (Cont…)

• Applications processing data with very large number of reduces, such that each reduce processes less than 1-2GB of data.

• Applications writing out multiple, small, output files from each reduce.

• Applications using the DistributedCache to distribute a large number of artifacts and/or very large artifacts (hundreds of MBs each).

• Applications using more than 25 counters per task.• Applications performing metadata operations (e.g. listStatus) on

the file-system from the map/reduce tasks.• Applications doing screen-scraping of JobTracker web-ui for status

of queues/jobs or worse, job-history of completed jobs.• Workflows comprising of hundreds of of small jobs processing

small amounts of data with a very high job submission rate.

Page 25: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Debugging

• Side effect files : Write to external files from M/R code• Web UI : Web UI shows stdout/stderr • Isolation Runner : Run the task on the tracker where

the task failed. Switch to the workspace of the task and run IsolationRunner.

• Debug Scripts : Upload the script to the DFS, create a symlink and pass this script in the conf file. One common use is to filter out exceptions from the logs/stderr/stdout

• LocalJobRunner is used to run a MapReduce job on local node. It can be used for faster debugging and proof-of-concept.

Page 26: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Task child environment settings

• The child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the child JVM via the mapred.child.java.opts

• An example showing multiple arguments and substitutions– showing jvm GC logging

– start of a passwordless JVM JMX agent so that it can connect with jconsole – get the thread dumps– sets the maximum heap-size of the child jvm to 512MB – add an additional path to the java.library.path of the child-jvm.

<property>

<name>mapred.child.java.opts</name>

<value>-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc

-Xloggc:/tmp/@[email protected]

-Dcom.sun.management.jmxremote.authenticate=false

-Dcom.sun.management.jmxremote.ssl=false

</value>

</property>

Page 27: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

Checklist..

1. Are your partitions uniform?

2. Can you combine records at the map side?

3. Are maps reading off a DFS block worth of data?

4. Are you running a single reduce wave (unless the data size per reducers is too big) ?

5. Have you tried compressing intermediate data & final data?

6. Are your buffer sizes large enough to minimize spills but small enough to stay clear of swapping?

7. Do you see unexplained “long tails” ? (can be mitigated via speculative execution)

8. Are you keeping your cores busy? (via slot configuration)

9. Is at least one system resource being loaded?

Page 28: Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

28