yahoo's experience running pig on tez at scale
TRANSCRIPT
Yahoo!’s Experience Running Pig on Tez at Scale
By Jon Eagles
Rohini Palaniswamy
Agenda
Introduction1
Our Migration Story
Scale and Stability
Performance and Utilization
Problems and What’s next
2
3
4
5
Overview
MAPREDUCE TEZ
Mapper and Reducer phases Shuffle between mapper and reducer tasks JobControl to run group of jobs with dependencies
Directed Acyclic Graph (DAG) with vertices Shuffle/One-One/Broadcast between vertex tasks Whole plan runs in a single DAG
Map TasksReduce Tasks
Why Tez?
Mapreduce is not cool anymore Performance Utilization
Run faster and also USE SAME OR LESS RESOURCES. For eg: 5x the speed, but utilize less memory and cpu. Increasing memory can slow down jobs if there is no capacity Runtime of job = (Time taken by tasks) * (Number of waves of task launches)
For eg: Queue with 1000GB can run 1000 1GB container tasks in parallel. 1002 tasks - Takes two waves and run time doubles.
3GB containers - Takes three waves and runtime triples. Run more and more jobs in parallel concurrently
Resource contention - Performance of jobs degrade
Utilization
Performance
Benchmarking
Reality
Peak traffic
Periods of peak utilization followed by low utilization is a common pattern Peak hours Catch-up processing during
delays Reprocessing Developing, testing and ad-hoc
queries Peak hours
Every hour 5min, 10min, 15min, 30min and
hourly feeds collide Start of the day
00:00 UTC and 00:00 PST 5min, 10min, 15min, 30min, hourly
and daily feeds collide
Our Migration
Story
Migration Status
Number of Pig scripts MAPREDUCE - 58129 TEZ -
148852
Progress : 72%
Number of applications MAPREDUCE - 147431 TEZ -
150027
Progress : 70%
Number of Pig script runs MAPREDUCE - 198785 TEZ - 5278
Number of applications MAPREDUCE - 487699 TEZ - 5418
*Data is for a single day
Oct 1 2015 Jun 17 2016
Unwritten Rule
NO User should be able to run scripts as is
No settings changes or additional settings required
No modifications to the scripts or UDFs required No special tuning required
Score: 80%
Rollout Multiple version support
Commandline pig or pig --useversion current pig --useversion 0.11 pig –useversion 0.14
Oozie sharelib tags pig_current pig_11 pig_14
Staged rollout switching from Pig 0.11 to Pig 0.14 as current Staging – 2 clusters Research – 3 clusters Production – 10 clusters
17 internal patch releases for Pig and Tez and 5 YARN releases. Yahoo! Pig 0.14 = Apache Pig 0.16 minus 40+ patches Yahoo! Tez 0.7 = Apache Tez 0.7.1 + ~15 patches
Feature parity with MapReduce Add translation for all important mapreduce settings to equivalent Tez settings
Speculative execution, Java opts, Container sizes, Max attempts, Sort buffers, Shuffle tuning, etc Map settings were mapped to Root vertices Reduce settings were mapped to Intermediate and Leaf vertices
Add equivalent Tez features for not commonly used mapreduce features. mapreduce.job.running.{map|reduce}.limit
Number of tasks that can run at a time mapreduce.{map|reduce}.failures.maxpercent
Percentage of failed tasks to ignore mapreduce.task.timeout
Timeout based on progress reporting by application mapreduce.job.classloader
Separate classloader for user code. Not implemented yet -files, -archives
No plans to add support
What required change?
Misconfigurations Bundling older versions of pig and hadoop jars with maven assembly plugin
NoClassDefFoundError, NoSuchMethodExecption Incorrect memory configurations
Containers killed for heap size exceeding container sizemapreduce.reduce.java.opts = -Xmx4096M
mapreduce.reduce.memory.mb=2048 Bzip for shuffle compression instead of lzo
Support added later
Bad Programming in UDFs Static variables instead of instance variables give wrong results with container reuse
groupedData = group A by $0;aggregate = FOREACH groupedData generate group, MYCOUNT(A.f1); MAPREDUCE ✔ TEZ ✖aggregate = FOREACH groupedData generate group, MYCOUNT(A.f1), MYCOUNT(A.f2);MAPREDUCE ✖ TEZ ✖
public class MYCOUNT extends AccumulatorEvalFunc<Long> {
private static int intermediateCount; @Override public void accumulate(Tuple b) throws IOException {
Iterator it = ((DataBag)b.get(0)).iterator(); while (it.hasNext()){
it.next(); intermediateCount ++; } } @Override public Long getValue() { return intermediateCount; }}
Behavior Change Part file name change
Output file names changed from part-{m|r}-00000 to part-v000-o000-r00000. Users had hardcoded part-{m|r}-00000 path references in mapreduce.job.cache.files of
downstream jobs Index files Dictionaries
Union optimization relied on output formats honoring mapreduce.output.basename Custom OutputFormats that hardcoded filenames as part-{m|r}-00000 had to be fixed.
Database loading tool had problem with longer filenames Parsing pig job counters from Oozie stats
There is only one job for the script instead of multiple jobs Users had assumptions about number of jobs when parsing counters
Increased namespace usage with UnionOptimizer Mapreduce had an extra map stage that processed 128MB per map Specify PARALLEL with UNION or turn optimization off
Tez
20 20
MapReduce
20 20
1
Tez no union
optimization
20 20
1
OutOfMemoryError – Application Master Processes more tasks, counters and events
Does the job of N number of MapReduce application masters Pig auto increases AM heap size based on number of tasks and vertices Optimizations:
Serialization improvements and reducing buffer copies Skip storing counters with value as zero.
Task configuration in memory MapReduce sent job.xml to tasks via distributed cache Config sent via RPC to tasks Way lot more configuration and information duplication
Processor Config per Input, Output and Edge
HCatLoader duplicating configuration and partition split information in UDFContext Not an issue anymore with all the fixes gone in
OutOfMemoryError - Tasks Auto-parallelism shuffle overload Replicated join happening in reducer phase in Tez and map phase in MapReduce
User configuration had map heap size higher than reducer heap size Increased memory utilization due to multiple inputs and outputs
Fetch inputs and finish sorting them and release buffers before creating sort buffers for output
In-Memory aggregation for Group By turned on by default in Pig. pig.exec.mapPartAgg=true aggregates using a HashMap before combiner Improvements to Memory based aggregation Better spilling in Pig’s SpillableMemoryManager
95% of the cases fixed and 5% required memory increase
Memory Utilization
MapReduce
Default Heap Size – 1GB Default io.sort.mb -
256MB
256 MB
256 MB
1 GB 768 MB
768 MB
Tez
256 MB
128 MB
1 GB
256 MB 256 MB
128 MB
128 MB200 MB312 MB
256 MB
128 MB
Join
Load
Group By
Tez Max memory for sort buffers 50% of heap – 512MB
Pig memory based aggregation 20% of heap - 200 MB
256 MB
256 MB
Scaleand
Stability
How complex processing can Tez handle? Scale
DAGs with 100-200 vertices Terabytes of data and billions of records flowing from start to finish of the DAG 98% of jobs can be run with same memory configurations as MapReduce Run 310K tasks in one DAG with just 3.5G heapsize
One user set default_parallel to 5000 for a big DAG (totally unnecessary and wasteful). Job still ran Stability
45-50K Pig on Tez jobs (not including hive) run in a single cluster each day without issues Speculative execution Full fault tolerance
Bad nodes Bad disks Shuffle fetch failures
Complexity of DAGs – Vertices per DAG
1 3 5 7 9 110
10000
20000
30000
40000
1-11 vertices
Number of DAGs
12 17 22 27 32 37 43 48 61 69 106
0200400600800
100012001400
12-155 vertices
Number of DAGs*Data for a single day
DAG PatternsDAG with 155 Vertices.
DAG with 61 Vertices
DAG PatternsDAG with 106 Vertices
DAG PatternsDAG with 55
Vertices
Performance and
Utilization
050
100150200250300
CPI Utilization (in Million vcores-secs)MapreduceTez
0
500
1,000
1,500
2,000Total Runtime (in hrs) Mapreduce
Tez
Before and After
0 5,000
10,000 15,000 20,000 25,000 30,000 35,000 40,000
Number of Jobs MapreduceTez
0100200300400500600700800
Memory Utilization (in PB-secs)MapreduceTez
*Numbers are for one major user in a single cluster
Jan 1 to Jun 22, 2016
Jan 1 to Jun 22, 2016
Jan 1 to Jun 22, 2016
Jan 1 to Jun 22, 2016
Savings per day 400-500 hours of
Runtime (23-30%) Individual job’s
runtime improvement vary between 5%-5x
100+ PB-secs of memory (15-25%)
30-50 million vcores-sec of CPU (18-28%)
Utilization could still be better Speculative execution happens lot more than mapreduce (TEZ-3316)
Task progress goes from 0 to 100% Progress should be updated based on % of input records processed similar to mapreduce 80% of our jobs run with speculative execution
Slow start does not happen with MRInput vertices (TEZ-3274) Container reuse
Larger size containers are reused for small container requests (TEZ-3315) < 1% of jobs affected
Start at the same time
Before and After - IO
050,000
100,000150,000200,000250,000300,000
HDFS_BYTES_READ (in GB)MapreduceTez
010,00020,00030,00040,00050,00060,00070,000
HDFS_BYTES_WRITTEN (in GB)MapreduceTez
020,00040,00060,00080,000
100,000120,000140,000160,000
FILE_BYTES_WRITTEN (in GB)
MapreduceTez
0
20,000
40,000
60,000
80,000
100,000FILE_BYTES_READ (in GB)
*Numbers are for one major user in a single cluster
Jan 1 to Jun 22, 2016
Jan 1 to Jun 22, 2016 Jan 1 to Jun 22, 2016
Jan 1 to Jun 22, 2016
Lower HDFS utilization as expected with no intermediate storage Both HDFS_BYTES_READ
and HDFS_BYTES_WRITTEN lower by ~20PB
More savings if 3x replication is accounted
Higher local file utilization than expected
IO issues to be fixed
More spills with smaller buffers in case of multiple inputs and outputs Better in-memory merging with inputs Lots of space wasted with serialization
DataOutputBuffer used for serializing keys and values extends ByteArrayOutputStream Byte array size is doubled for every expansion - 33MB data occupies 64MB Chained byte arrays avoiding array copies (TEZ-3159)
Shuffle fetch failures and re-run of upstream tasks impact both runtime and increase IO Probability of occurrence lot higher than mapreduce
Increased disk activity with with Auto-Parallelism Disks are hammered during fetch More on disk activity with shuffle and merge due to spill
Performance and Utilization
RunTime
Numberof Jobs
Number of Tasks
Mapreduce
1 hr 15 min
46 45758
Tez 39 min 1 22879 Mapreduce
AM Container Size – 1.5 GB ( 46*1.5 = 69 GB) Task Container Size – 2 GB
Tez AM Container Size – 3 GB Task Container Size – 2 GB
Problemsto be
Addressed
Challenges with DAG Processing
DAG is not always better than mapreduce Some of the Tez optimizations which do well on small scale backfired on large scale
Multiple inputs and outputs Data skew in unordered partitioning Auto Parallelism
Multiple inputs and Outputs
Above script groups by 59 dimensions and then joins them Grouping by multiple dimensions is a very common use case Mapreduce implementation used multiplexing and de-multiplexing logic to do it in a single job.
Tag each record with the index of the group by and run it through corresponding plan Tez implementation of using separate reducers is generally efficient faster. But performance
degrades as the data volume and number of inputs and outputs increase due to too small sort buffers and lot of spill.
Solution – Implement older multiplexing and de-multiplexing logic in Tez for > 10 inputs/outputs.
Skewed Data Data skew is one of the biggest causes of slowness
One or two reducers end up processing most of the data Skewed output in one vertex can keep getting propagated downstream causing more
slowness Self joins which use One-One edges Unordered partitioning (TEZ-3209)
Storing intermediate data in HDFS acted as autocorrect for skew in mapreduce Each of the maps, combined splits up to 128MB by default for processing.
Auto-Parallelism
Map Vertex
Reduce Vertex
Map Vertex
Reduce Vertex
tez.shuffle-vertex-manager.desired-task-input-size 128MB per reducer – intermediate
vertex 1GB per reducer – leaf vertex
Increased shuffle fetches per reducer Before – 3 fetches per reducer After – 9 fetches in one reducer
tez.shuffle-vertex-manager.min-task-parallelism – Default is 1 Pig does not use it yet
1000 mappers and 999 reducers – auto parallelism reduction to 1 reducer 999000 fetches in single reducer Disk Merge is also costly
Improving Auto-Parallelism Scaling
Composite messaging (TEZ-3222) Reduce number of DataMovementEvents
Optimize empty partitions handling (TEZ-3145) Custom Shuffle Handler for Tez
Remember file location of map output. Avoid listing on all volumes. Ability to fetch multiple partitions in one request
Turn on MemToMem Merger by default tez.runtime.shuffle.memory-to-memory.enable (mapreduce.reduce.merge.memtomem.enabled in MapReduce) Still in experimental stage with lots of issues. Threshold is currently based on number of map outputs Dynamically merge based on the size and available memory
Range partitioning is not efficient Easy implementation to start with Skewed data prevents better reduction. Two buckets with skew next to each other makes it worse Best fit algorithm instead of range partitioning (TEZ-2962)
Reducer 0 - 67 MBReducer 1 - 64 MBReducer 2 - 64 MBReducer 3 - 5 MB
Reducer 4 - 5 MB
Reducer 1 - 128 MB
Reducer 2 - 10 MB
Reducer 0 - 67 MB
Summary
Transition was a little bumpier and longer than expected (Planned Q4 2015) Major initial rollout delay due to Application TimeLine Server not scaling for more than 5K
applications Tons of issues stability and scalability issues fixed and contributed to Apache
Full migration by July 2016 Tez scales and it is now stable. Worked for Yahoo scale. Smooth and easy migration from MapReduce
Really Good UI Some cool features currently worked on in the community
DFS based based shuffle (TEZ-2442) Custom edge for CROSS (TEZ-2104) Parallelized LIMIT and skip running tasks once desired record count is reached.
Lot more features, optimizations, better algorithms and crazy ideas yet to be done. Tez is an Assembly Language at its core
Acknowledgements
Jason Lowe - our very own Super Computer for debugging crazy issues Koji Noguchi - another of our Pig internals expert and Committer Yahoo! Pig users who put up with the most of the pain Daniel Dai from Apache Pig PMC Apache Tez PMC
Bikas Saha Siddharth Seth Hitesh Shah