ordered record collection
Post on 13-Sep-2014
3.491 views
DESCRIPTION
Chris Douglas, Yahoo!TRANSCRIPT
Sort of Vinyl: Ordered Record CollectionChris Douglas01.18.2010
Obligatory MapReduce Flow Slide
Map
0M
ap 1
Map
2
Red
uce
1R
educ
e 1
HD
FS
HD
FS
Split 2
Split 1
Split 0
hdfs
://h
ost:
8020
/inp
ut/d
ata C
ombi
ne*
Com
bine
*C
ombi
ne* hd
fs:/
/hos
t:80
20/o
utpu
t/da
ta
Obligatory MapReduce Flow Slide
Map
0M
ap 1
Map
2
Red
uce
1R
educ
e 1
HD
FS
HD
FS
Split 2
Split 1
Split 0
hdfs
://h
ost:
8020
/inp
ut/d
ata C
ombi
ne*
Com
bine
*C
ombi
ne* hd
fs:/
/hos
t:80
20/o
utpu
t/da
ta
Map Output Collection
Overview
Hadoop (∞, 0.10) Hadoop [ 0.10, 0.17) Hadoop [0.17, 0.22]
Lucene HADOOP-331 HADOOP-2919
Overview
Hadoop (∞, 0.10) Hadoop [ 0.10, 0.17) Hadoop [0.17, 0.22]
Lucene HADOOP-331 HADOOP-2919
Cretaceous Jurassic Triassic
Awesome!
Problem Description
*map(K1,V1)
collect(K2,V2)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
key0
p0 partition(key0,val0)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
key0
p0 partition(key0,val0)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
key0 val0
p0 partition(key0,val0)
Problem Description
**write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
key0 val0
p0 partition(key0,val0)
Serialization
K2.write(DataOutput)
V2.write(DataOutput)
Problem Description
*Serialization
K2.write(DataOutput)
V2.write(DataOutput)
*write(byte[], int, int)
*write(byte[], int, int)
map(K1,V1)
collect(K2,V2)
key0 val0
p0 partition(key0,val0)
int
byte[] byte[]
Problem Description
For all calls to collect(K2 keyn, V2 valn):•Store result of partition(K2 keyn, V2 valn)•Ordered set of write(byte[], int, int) for keyn
•Ordered set of write(byte[], int, int) for valn
Challenges:•Size of key/value unknown a priori•Records must be grouped for efficient fetch from reduce•Sort occurs after the records are serialized
Overview
Hadoop (∞, 0.10) Hadoop [ 0.10, 0.17) Hadoop [0.17, 0.22]
Lucene HADOOP-331 HADOOP-2919
Cretaceous Jurassic Triassic
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
*collect(K2,V2)
SequenceFile::Writer[p0].append(key0, val0)
…
…
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
*collect(K2,V2)
SequenceFile::Writer[p0].append(key0, val0)
…
…
key0.write(localFS)
val0.write(localFS)
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
*collect(K2,V2)
SequenceFile::Writer[p0].append(key0, val0)
…
…
key0.write(localFS)
val0.write(localFS)
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
*collect(K2,V2)
SequenceFile::Writer[p0].append(key0, val0)
…
…
key0.write(localFS)
val0.write(localFS)
Not necessarily true. SeqFile may buffer configurable amount of data to effect block compresion, stream buffering, etc.
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2)
p0 partition(key0,val0)
*collect(K2,V2)
SequenceFile::Writer[p0].append(keyn’, valn’)
…
…
key0
key1
key2
clone(key0, val0)
reduce(keyn, val*)
flush()
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2) *collect(K2,V2)
SequenceFile::Writer[p0].append(keyn’, valn’)
…
…
key0
key1
key2
clone(key0, val0)
reduce(keyn, val*)
flush()
p0 partition(key0,val0)
Hadoop (∞, 0.10)
map(K1,V1)
collect(K2,V2) *collect(K2,V2)
SequenceFile::Writer[p0].append(keyn’, valn’)
…
…
key0
key1
key2
clone(key0, val0)
reduce(keyn, val*)
flush()
Combiner may change the partition and ordering of input records. This is no longer supported
p0 partition(key0,val0)
Hadoop (∞, 0.10)
…
TaskTracker
Reduce kReduce 0 …
Hadoop (∞, 0.10)
…
TaskTracker
Reduce kReduce 0 …
Hadoop (∞, 0.10)
Reduce 0
…
sort/merge localFS
Hadoop (∞, 0.10)
Pro:•Complexity of sort/merge encapsulated in SequenceFile, shared between MapTask and ReduceTask•Very versatile Combiner semantics (change sort order, partition)
Con:•Copy/sort can take a long time for each reduce (lost opportunity to parallelize sort)•Job cleanup is expensive (e.g. 7k reducer job must delete 7k files per map on that TT)•Combiner is expensive to use and its memory usage is difficult to track•OOMExceptions from untracked memory in buffers, particularly when using compression (HADOOP-570)
Overview
Hadoop (∞, 0.10) Hadoop [ 0.10, 0.17) Hadoop [0.17, 0.22]
Lucene HADOOP-331 HADOOP-2919
Cretaceous Jurassic Triassic
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
Keep offset into buffer, length of key, value.
sortAndSpillToDisk()
Add memory used by all BufferSorter implementations and keyValBuffer. If spill threshold exceeded, then spill contents to disk
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
0*Sort permutes offsets into (offset,keylen,vallen). Once ordered, each record is output into a SeqFile and the partition offsets recorded
*
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
0*Sort permutes offsets into (offset,keylen,vallen). Once ordered, each record is output into a SeqFile and the partition offsets recorded
*
K2.readFields(DataInput)
V2.readFields(DataInput)
SequenceFile::append(K2,V2)
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
0*If defined, the combiner is now run during the spill, separately over each partition. Values emitted from the combiner are written directly to the output partition.*
K2.readFields(DataInput)
V2.readFields(DataInput)
SequenceFile::append(K2,V2)
<< Combiner >>
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
0
*
1
Hadoop [0.10, 0.17)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
BufferSorter[p0].addKeyValue(recOff, keylen, vallen)
…
K2.write(DataOutput)
V2.write(DataOutput)
0 1 k-1 k
sortAndSpillToDisk()
0
1
…k
…
Hadoop [0.10, 0.17)
0
1
…k
…
0
1…k
…
0
1
…k
…
mergeParts()
Hadoop [0.10, 0.17)
0
1
…k
…
0
1…k
…
0
1
…k
…
mergeParts()
0
Hadoop [0.10, 0.17)
1
…k
…
0
Task
Trac
ker
Reduce k
Reduce 0
…
Hadoop [0.10, 0.17)
1
…k
…
0
Task
Trac
ker
Reduce k
Reduce 0
…
Hadoop [0.10, 0.17)
Pro:•Distributes the sort/merge across all maps; reducer need only merge its inputs•Much more predictable memory footprint•Shared, in-memory buffer across all partitions w/ efficient sort•Combines over each spill, defined by memory usage, instead of record count•Running the combiner doesn’t require storing a clone of each record (fewer serializ.)•In 0.16, spill was made concurrent with collection (HADOOP-1965)
Con:•Expanding buffers may impose a performance penalty; used memory calculated on every call to collect(K2,V2)•MergeSort copies indices on each level of recursion•Deserializing the key/value before appending to the SequenceFile is avoidable•Combiner weakened by requiring sort order and partition to remain consistent•Though tracked, BufferSort instances take non-negligible space (HADOOP-1698)
Overview
Hadoop (∞, 0.10) Hadoop [ 0.10, 0.17) Hadoop [0.17, 0.22]
Lucene HADOOP-331 HADOOP-2919
Cretaceous Jurassic Triassic
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
io.sort.mb * io.sort.record.percent
…
io.sort.mb
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
KS.serialize(V2)
Serialization
io.sort.mb * io.sort.record.percent
…
io.sort.mb
Instead of explicitly tracking space used by record metadata, allocate a configurable amount of space at the beginning of the task
Hadoop [0.17, 0.22)
bufstartbufendbufindexbufmark
kvstartkvend
kvindex
io.sort.mb * io.sort.record.percent
io.s
ort.
mb
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
Hadoop [0.17, 0.22)
bufstartbufendbufindexbufmark
kvstartkvend
kvindex
io.sort.mb * io.sort.record.percent
io.s
ort.
mb
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvoffsets kvindices
kvbufferPartition no longer implicitly tracked. Store (partition, keystart,valstart) for every record collected
Hadoop [0.17, 0.22)
bufstartbufend
kvstartkvend
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
bufindexbufmark
kvindex
Hadoop [0.17, 0.22)
bufstartbufend
kvstartkvend
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
bufmark
kvindex
bufindex
Hadoop [0.17, 0.22)
bufstartbufend
kvstartkvend
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
bufmark
kvindex
bufindex
Hadoop [0.17, 0.22)
bufstartbufend
kvstartkvend
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
bufmark
kvindex
bufindex
p0
Hadoop [0.17, 0.22)
bufstartbufend
kvstartkvend
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvindex
bufindexbufmark
io.sort.spill.percent
Hadoop [0.17, 0.22)
bufstart
kvstart
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvendkvindex
bufendbufindexbufmark
Hadoop [0.17, 0.22)
bufstart
kvstart
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvend
bufend
kvindex bufindexbufmark
Hadoop [0.17, 0.22)
bufstart
kvstart
map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvend
bufend
kvindex
bufmarkbufindex
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvstartkvend
bufstartbufend
bufmarkbufindex
kvindex
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvstartkvend
bufstartbufend
bufmarkbufvoid
bufindexkvindex
RawComparator interface requires that the key be contiguous in the byte[]
Invalid segments in the serialization buffer are marked by bufvoid
Hadoop [0.17, 0.22)map(K1,V1)
*collect(K2,V2)
p0 partition(key0,val0)
KS.serialize(K2)
VS.serialize(V2)
Serialization
kvstartkvend
bufstartbufend
bufvoid
bufmarkbufindexkvindex
Hadoop [0.17, 0.22)
Pro:•Predictable memory footprint, collection (though not spill) agnostic to number of reducers. Most memory used for the sort allocated upfront and maintained for the full task duration.•No resizing of buffers, copying of serialized record data or metadata•Uses SequenceFile::appendRaw to avoid deserialization/serialization pass•Effects record compression in-place (removed in 0.18 with improvements to intermediate data format HADOOP-2095)
Other Performance Improvements•Improved performance, no metadata copying using QuickSort (HADOOP-3308)•Caching of spill indices (HADOOP-3638)•Run combiner during the merge (HADOOP-3226)•Improved locking and synchronization (HADOOP-{5664,3617})
Con:•Complexity and new code responsible for several bugs in 0.17•(HADOOP-{3442,3550,3475,3603})•io.sort.record.percent is obscure, critical to performance, and awkward•While predictable, memory usage is arguably too restricted•Really? io.sort.record.percent? (MAPREDUCE-64)
bufstartbufend
bufindexbufmarkequator
kvstartkvendkvindex
kvstart 1048560
kvend 1048560
kvindex 1048560
equator 0
bufstart 0
bufend 0
bufindex 0
bufmark 0
bufvoid 1048576
Hadoop [0.22]
bufstartbufendequator
kvstartkvend
kvindex
bufindexbufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300000
bufmark 300000
bufvoid 1048576
Hadoop [0.22]
bufstartbufendequator
kvstartkvend
kvindex
bufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300030
bufmark 300000
bufvoid 1048576 bufindex
Hadoop [0.22]
bufstartbufendequator
kvstartkvend
kvindex
bufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300030
bufmark 300000
bufvoid 1048576
Hadoop [0.22]
bufindex
bufstartbufendequator
kvstartkvend
kvindex
bufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300030
bufmark 300000
bufvoid 1048576bufindex
Hadoop [0.22]
bufstartbufendequator
kvstartkvend
kvindex
bufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300030
bufmark 300000
bufvoid 1048576bufindex
Hadoop [0.22]
p0kvoffsets and kvindices information interlaced into metadata blocks. The sort is effected in a manner identical to 0.17, but metadata is allocated per-record, rather than a priori (kvoffsets) (kvindices)
bufstartbufendequator
kvstartkvend
kvindex
kvstart 1048560
kvend 1048560
kvindex 968560
equator 0
bufstart 0
bufend 0
bufindex 300030
bufmark 300030
bufvoid 1048576bufindexbufmark
Hadoop [0.22]
bufstartkvstart
kvindex
kvstart 1048560
kvend 968576
kvindex 736000
equator 736020
bufstart 0
bufend 300030
bufindex 736020
bufmark 300030
bufvoid 1048576bufend
equatorbufindexbufmark
kvend
Hadoop [0.22]
bufstartkvstart
kvindex
kvstart 1048560
kvend 968576
kvindex 696000
equator 736020
bufstart 0
bufend 300030
bufindex 811020
bufmark 811020
bufvoid 1048576bufend
equator
kvend
bufindexbufmark
Hadoop [0.22]
kvindex
kvstart 968576
kvend 968576
kvindex 696000
equator 736020
bufstart 300030
bufend 300030
bufindex 811020
bufmark 811020
bufvoid 1048576bufstartbufend
equator
kvstartkvend
bufindexbufmark
Hadoop [0.22]
kvindex
kvstart 736000
kvend 736000
kvindex 696000
equator 736020
bufstart 736020
bufend 736020
bufindex 811020
bufmark 811020
bufvoid 1048576
equatorbufstartbufend
kvstartkvend
bufindexbufmark
Hadoop [0.22]
bufstartbufendequator
kvstartkvend
kvindex
bufmark
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300000
bufmark 300000
bufvoid 1048576 bufindex
Hadoop [0.22]
bufstart
kvstart
kvindex
bufmarkbufendequator
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300000
bufmark 300000
bufvoid 1048576 bufindex
kvend
Hadoop [0.22]
kvstartkvend
bufmarkbufstartbufendequator
kvstart 1048560
kvend 1048560
kvindex 968576
equator 0
bufstart 0
bufend 0
bufindex 300000
bufmark 300000
bufvoid 1048576
bufindex
kvindex
Hadoop [0.22]
Questions?