i-files: handling intermediate data in parallel dataflow graphs

44
I-Files: Handling Intermediate Data In Parallel Dataflow Graphs Sriram Rao November 2, 2011

Upload: morey

Post on 25-Feb-2016

26 views

Category:

Documents


0 download

DESCRIPTION

I-Files: Handling Intermediate Data In Parallel Dataflow Graphs. Sriram Rao November 2, 2011. Joint Work With…. Raghu Ramakrishnan , Adam Silberstein: Yahoo Labs Mike Ovsiannikov , Damian Reeves: Quantcast. Motivation. Massive growth in online advertising (read…display ads) - PowerPoint PPT Presentation

TRANSCRIPT

Page 1: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sriram Rao

November 2, 2011

Page 2: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Joint Work With…

Raghu Ramakrishnan, Adam Silberstein: Yahoo Labs Mike Ovsiannikov, Damian Reeves: Quantcast

Page 3: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Motivation

Massive growth in online advertising (read…display ads) Companies are reacting to this opportunity via behavioral

ad-targeting› Collect click-stream logs, mine the data, build models, show ads

“Petabyte scale data mining” using computational frameworks (such as, Hadoop, Dryad) is commonplace

Analysis of Hadoop job history logs shows:› Over 95% of jobs are small (run for a few mins, process small data)› About 5% of jobs are large (run for hours, process big data)

Page 4: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Where have my cycles gone?

5% of jobs take 90% of cycles!

Page 5: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Who is using my network?

5% of jobs account for 99% of network traffic!

Page 6: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

So…

Analysis shows 5% of the jobs are “big”:› 5% of jobs use 90% cluster compute cycles› 5% of jobs shuffle 99% of data (i.e., 99% network bandwidth)

To improve cluster performance, improve M/R performance for large jobs

Faster, faster, faster: virtuous cycle› Cluster throughput goes up› Users will run bigger jobs

Our work: Focus on handling intermediate data at scale in parallel dataflow graphs

Page 7: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Handling Intermediate Data in M/R

In a M/R computation, map output is intermediate data For transferring intermediate data from map to reduce:

› Map tasks generate data, write to disk› When a reduce task pulls map output,

• Data has to be read from disk

• Transferred over the network– Cannot assume that mappers/reducers can be scheduled concurrently

Transporting intermediate data:› Intermediate data size < RAM size: RAM masks disk I/O› Intermediate data size > RAM size: Cache hit rate masks disk I/O› Intermediate data size >> RAM size: Disk overheads affect perf

Page 8: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Intermediate Data Transfer: Distributed Merge Sort

› # of disk seeks for transferring intermediate data α M * R

› Avg. amount of data reducer pulls from a mapper α 1 / R

Handling Intermediate data at scale

Reduce

Map

Distributed File System

(M * R) Disk

Seeks

Page 9: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Disk Overheads (More detail) “Fix” the amount of data generated by a map task

• Size RAM such that the map output fits in-memory and can be sorted in 1-pass

– For example, use 1GB

“Fix” the amount of data consumed by a reduce task

• Size RAM for a 1-pass merge

– For example, use 1GB

Now…

• For a job with 1TB of data 1024 mappers generate 1G each; 1024 reduces consume 1G each

– On average, data generated by a map for a given reducer = 1G / 1024 = 1M

• For a job with 16TB of data 16K mappers generate 1G each; 16K reduces consume 1G each

– On average, data generated by a map for a given reducer = 1G / 16K = 64K

With scale, # of seeks increases; data read/seek decreases

Page 10: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

As the volume of intermediate data scales,

› Amount of data read per seek decreases

› # of disk seeks increases non-linearly

Net result: Job performance will be affected by the disk overheads in handling intermediate data

› Intermediate data increases by 2

› Job-run time increases by 2.5x

Disk Overheads

Page 11: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

What is new?

I-Files

Map

Distributed File System

Reduce

Network-wide Merge

Fewer Seeks!

One intermediate file per reducer, instead of one per mapper

Page 12: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Our work

New approach for efficient handling of intermediate data at large scale

• Minimize the number of seeks

• Maximize the amount of data read/written per seek

• Primarily geared towards LARGE M/R jobs: – 10’s of TB of intermediate data

– 1000’s of mapper/reducer tasks

I-files: Filesystem support for intermediate data› Atomic record append primitive that allows write parallelism at scale

› Network-wide batching of intermediate data

Build Sailfish (by modifying Hadoop-0.20.2) where intermediate data is transported using I-files

Page 13: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

How did we do? (Benchmark job)

Page 14: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

How did we do? (Actual Job)

Page 15: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Talk Outline

Properties of Intermediate data

I-files implementation

Sailfish: M/R implementation that uses I-files for intermediate data

Experimental Evaluation

Summary

Page 16: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Organizing Intermediate Data

Hadoop organizes intermediate data in a format convenient for the mapper

What if we went the opposite way: organize it in a format convenient for the reducer?› Mappers write their output to a per-partition I-file› Data destined for a reducer is in a single file› Build the intermediate data file in a manner that is suitable for the

reader rather than the writer

Page 17: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Reducer input is generated by multiple mappers

File is a container into which mapper output needs to be stored

› Write order is k1, k2, k3, k4

› Processing order is k3, k1, k4, k2

Because reducer imposes processing order, writer does not care where the output is stored in the file

Once a mapper emits a record, the output is committed

› There is no “withdraw”

Intermediate data

M

M

M

M

RFile

k3k1k4k2

k2

k1

k3

k4

Page 18: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Properties of Intermediate data file

Multiple mappers generate data that will be consumed by a single reducer› Need low latency multi-writer support

Writers are doing append-only writes› Contents of the I-file are never overwritten

Arbitrary interleaving of data is ok:› Writer does not care where the data goes in the file › Any ordering we need can be done post-facto

No ordering guarantees for the writes from a single client• Follows from arbitrary interleaving of writes

Page 19: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Atomic Record Append

Multi-writer support => need an atomic primitive› Intermediate data is append only…so, need atomic append

With atomic record append primitive clients provide just the data but the server chooses the offset with arbitrary interleaving› In contrast, in a traditional write clients provide data+offset

Since server is choosing the offset, design is lock-free To scale atomic record append with writers, allow

› Multiple writers append to a single block of the file› Multiple blocks of the file concurrently appended to

Page 20: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Atomic Record Append

Client1

Server

Client2

ARA: <A, offset = -1>

ARA: <B, offset = -1>

B 300

A 350

C 400

D 500

Offset = 300

Page 21: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Implementing I-files

Have implemented I-files in context of Kosmos distributed filesystem (KFS)› Why KFS?

• KFS has multi-writer support

• We have designed/implemented/deployed KFS to manage PB’s of storage

KFS is similar to GFS/HDFS› Chunks are striped across nodes and replicated for fault-tolerance

– Chunk master serializes all writes to a chunk

› For atomic append, chunk master assigns the offset› With KFS I-files, multiple chunks of the I-file can be concurrently

modified

Page 22: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Atomic Record Append

Writers are routed to a chunk that is open for writing› For scale, limit the # of concurrent writers to a chunk

When client gets an ack back from chunk master, data is replicated in the volatile memory at all the replicas› Chunkservers are free to commit data to disk asynchronously

Eventually, chunk is made stable› Data is committed to disk at all the replicas› Replicas are byte-wise identical

Stable chunks are not appended to again

Page 23: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Talk Outline

Properties of Intermediate data

I-files implementation

Sailfish: M/R implementation that uses I-files for intermediate data

Experimental Evaluation

Summary

Page 24: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

The Elephant Can Dance…

Hadoop Shuffle Pipeline

map()

(De) Serialization

reduce()

Sailfish Shuffle Pipeline

Page 25: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sailfish Overview

Modify Hadoop-0.20.2 to use I-files for MapReduce› Mappers write their output to a per-partition I-file

• Replication factor of 1 for all the chunks of an I-file

• At-least-once semantics for append; filter dups on the reduce side

› Data destined for a reducer is in a single file› Build the intermediate data file in a manner that is suitable for the

reader rather than the writer

Automatically parallelize execution of the reduce phase: Set the number of reduce tasks and work assignment dynamically › Assign key-ranges to reduce tasks rather than whole partitions› Extend I-files to support key-based retrieval

Page 26: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sailfish Map Phase Execution

Page 27: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sailfish Reduce Phase Execution

Page 28: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Atomic “Record” Append For M/R

M/R computations are about processing records› Intermediate data consists of key/value pairs

Extend atomic append to support “records”› Mappers emit <key, record>

• Per-record framing that identifies the mapper task that generated a record

› System stores per-chunk index

• After chunk is stable, chunk is sorted and an index is built by the sorter– Sorting is a completely local operation: read a block from disk, sort in RAM, and write back

to disk

› Reducers can retrieve data by <key>

• Use per-record framing to discard data from dead mappers

Page 29: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sailfish Architecture

Hadoop JTSubmit Job

Mapper Task

I Appender

Reducer Task

IMerger

Read/Merge

.

.

.

KFS I-files

workbuilder

What do I do? I-file 5

[a, d)

Page 30: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Handling Failures

Whenever a chunk of an I-file is lost, need to re-generate lost data

With I-file, we have multiple mappers writing to a block For fault-tolerance,

› Workbuilder tracks the set of chunks modified by a mapper task› Whenever a chunk is lost, workbuilder notifies the JT of the set of

map tasks that have to be re-run› Reducers reading from the I-file with the lost chunk wait until data is

re-generated

For fault-containment, in Sailfish, use per-rack I-files› Mappers running in a rack write to chunks of the I-file stored in the

rack

Page 31: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Fault-tolerance With Sailfish

Alternate option is to replicate map-output› Use atomic record append to write to two chunkservers

• Probability of data loss due to (concurrent) double failure is low

› Performance hit for replicating data is low

• Data is replicated using RAM and written to disk async

› However, network traffic increases substantially

• Sailfish causes network traffic to double compared to Stock Hadoop– Map output is written to the network and reduce input is read over the network

• With replication, data traverses the network three times– Alternate strategy is to selectively replicate map output

Replicate in response to data loss

Replicate output that was generated the earliest

Page 32: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Sailfish Reduce Phase # of reducers/job and their task assignment is determined by the workbuilder

in a data-dependent manner› Dynamically set the # of reducer per job after the map phase execution is complete

# of reducers/I-file = (size of I-file) / (work per reducer)› Work per reducer is set based on RAM (in experiments, use, 1GB per reduce task)

› If data assigned to a task exceeds size of RAM, merger does a network-wide merge by appropriately streaming the data

Workbuilder uses the per-chunk index to determine split points› Each reduce task is assigned a range of keys within an I-file

• Data for a reduce task is in multiple chunks and requires a merge

• Since chunks are sorted, data read by a reducer from a chunk is all sequential I/O

Skew in reduce input is handled seamlessly› I-file with more data has more tasks assigned to it

Page 33: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Experimental Evaluation

Cluster comprises of ~150 machines› 6 map tasks, 6 reduce tasks per node

• With Hadoop M/R tasks, a JVM is given 1.5G RAM for one pass sort/merge

› 8 cores, 16GB RAM, 4-750GB drives, 1Gbps between any pair of nodes

› Job uses all the nodes in the cluster

Evaluate with benchmark as well as real M/R job› Simple benchmark that generates its own data (similar to terasort)

• Measure only the overhead with transporting intermediate data

• Job generates records with random 10-byte key, 90-byte value

› Experiments vary the size of intermediate data (1TB – 64TB)

• Mappers generate 1GB of data and reducers consume ~1GB of data

Page 34: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

I-files in practice

150 map tasks/rack 128 map tasks concurrently appending to a block of an I-

file 2 blocks of an I-file are concurrently appended to in a rack 512 I-files per job

› Beyond 512 I-files hit system limitations in the cluster (too many open files, too many connections)

KFS chunkservers use direct I/O with the disk subsystem, by-passing the buffer cache

Page 35: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

How did we do? (Benchmark job)

Page 36: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

How many seeks?

With Stock Hadoop, number of seeks is α M * R With Sailfish, it is the product of:

› # of chunks per I-file (c)

› # of reduce tasks per I-file (R / I)

› # of I-files (I)

We get: c * I * (R / I) = c * R # of chunks per I-file: 64TB intermediate data split over

512 I-files, where the chunksize is 128MB› c = (64TB / (512 * 128MB)) = 1024

# of map tasks at 64TB: 65536 (64TB / 1GB per mapper): c << M

Page 37: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Why does Sailfish work?

Where are the gains coming from?› Write-path is low-latency and is able to keep as many disk arms

and NICs busy› Read-path:

• Lowered the number of disk seeks

• Reads are large/sequential

Compared to Hadoop, read path in Sailfish is very efficient› Efficient disk read path leads to better network utilization

Page 38: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Data read per seek

Page 39: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Disk Thruput (during Reduce phase)

Page 40: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Using Sailfish In Practice Use a job+data from one of the behavioral ad-targeting pipelines at

Yahoo› BT-Join: Build a sliding N-day model of user behavior

• Take 1 day of clickstream logs and join with previous N days and produce a new N-day model

Input datasets compressed using bz2:

› Dataset A: 1000 files, 50MB apeice (10:1 compression)

› Dataset B: 1000 files, 1.2GB apeice (10:1 compression)

Extended Sailfish to support compression for intermediate data

• Mappers generate upto 256K of records, compress, and “append record”

• Sorters read compressed data, decompress, sort, and recompress

• Merger reads compressed data, decompress, merge and pass to reducer

• For performance, use LZO from Intel IPP package

Page 41: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

How did we do? (BT-Join)

Page 42: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

BT-Join Analysis

Speedup in Reduce phase is due better batching Speedup in Map-phase:

› Stock Hadoop: if map output doesn’t fit in RAM, mappers do an external sort

› Sailfish: Sorting is outside the map task and hence, no limits on the amount of map output generated by a map task

Net result: Job with Sailfish is about 2x faster when compared to Stock Hadoop

Page 43: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Related Work

Atomic append was introduced in GFS paper (SOSP’03)› GFS however seems to have moved away from atomic append as

they say it has not usable (at least once semantics and replicas can diverge)

Balanced systems: TritonSort› Stage-based Sort engine in which the hardware is balanced

• 8 cores, 24GB RAM, 10Gig NIC, 16 drives/box

• Software is then constructed in a way that balances hardware use

› Follow-on work on building an M/R on top of TritonSort

• Not clear how general their M/R engine is (seems specific to sort)

› Sailfish tries to achieve balance via software and is a general M/R engine

Page 44: I-Files: Handling Intermediate Data In Parallel Dataflow Graphs

Summary

Designed I-files for intermediate data and built Sailfish for doing large-scale M/R› Sailfish will be released as open-source

Build Sailfish on top of YARN› Utilize the per-chunk index:

• Improve reduce task planning based on key distributions

• “Checkpoint” reduce tasks on key-based boundaries and allow better resource sharing

• Support aggregation trees

Having the intermediate data outside a M/R job allows new debugging possibilities› Debug just the reduce phase