join algorithms using mapreduce
DESCRIPTION
Join algorithms using mapreduce. Haiping Wang [email protected]. Outline. MapReduce Framework MapReduce i mplementation on Hadoop Join algorithms using MapReduce. MapReduce: Simplified data processing on large clusters . In OSDI, 2004. MapReduce WordCount Diagram. file 1. file 2. - PowerPoint PPT PresentationTRANSCRIPT
OUTLINE
MapReduce Framework MapReduce implementation on Hadoop Join algorithms using MapReduce
MAPREDUCE: SIMPLIFIED DATA PROCESSING ON LARGE CLUSTERS. IN OSDI, 2004
MAPREDUCE WORDCOUNT DIAGRAM
ah ah er ah if or or uh or ah if
ah:1,1,1,1
ah:1 if:1 or:1 or:1 uh:1 or:1 ah:1 if:1
er:1 if:1,1or:1,1,1uh:1
ah:1 ah:1 er:1
4 1 2 3 1
file1 file2 file3 file4 file5 file6 file7
(ah) (er) (if) (or) (uh)
reduce(String outputkey, Iterator intermediate_alues):
map(String inputkey, String inputvalue):
JobTracker
TaskTracker
Record Reader
Record Writer
MapperPartitioner Sorter
Reducer
Copy
InputFormatOutputFormat
MAPREDUCE IMPLEMENTATION ON HADOOP
MAPREDUCE IMPLEMENTATION ON HADOOP
HADOOP MAPREDUCE FRAMEWORK ARCHITECTURE
JOIN ALGORITHMS USING MAPREDUCE
Map-Reduce-Merge: Simplified Relational Data Processing on Large Clusters sigmod07
Semi-join Computation on Distributed File Systems Using Map-Reduce-Merge Model Sac10
Optimizing joins in a map-reduce environment VLDB09,EDBT2010
A Comparison of Join Algorithms for Log Processing in MapReduce sigmod10
MAP-REDUCE-MERGE: SIMPLIFIED RELATIONAL DATA PROCESSING ON LARGE CLUSTERS SIGMOD07
MAP-REDUCE-MERGE IMPLEMENTATIONS OFRELATIONAL JOIN ALGORITHMS
Sort-merger join
Map range partitioner , ordered bucket s, each bucket a reducer
Reduce Read the designed buckets from all mappers and merged them into a sorted set
Merge Read sorted buckets from two data sets and do sort-merge join
Hash join Map Hash partitioner, hashed buckets, each bucket a reducer
Reduce Read the designed buckets from all mappers , use a hash table to group and aggregate these records(the same hash function as the mapper ), does not need a sorter
Merge In memory hash join
Block Nested loop join
Map The same as the hash join
Reduce The same as the hash join
Merge Nested loop join
EXAMPLE: HASH JOIN
split split split split
mapper mapper mapper
reducer reducer reducer
merger merger merger
split split split split
mapper mapper mapper
reducer reducer reducer
Use a hash partitioner
Read from every mapper for one designated partition
• Read from two sets of reducer outputs that share the same hashing buckets
• One is used as a build set and the other probe
ANALYSIS AND CONCLUSION
Connections A(ma, ra ), B(mb , rb ), r mergers suppose ra=rb=r Map->Reduce connections=
ra*ma+rb*mb=r*(ma+mb) Reduce->Merge in one-to-one case, connections=2r matcher: compare tuples to see id they should be
merged or not Conclusion
Use multiple map-reduce job Partitioner may cause data skew problem The number of ma, ra, mb, rb, r ra=rb? –>
connections
SEMI-JOIN COMPUTATION STEPS AND WORKFLOW
Equal join reduce communication costs disk I/O costs Insensitive to data skew ?
A COMPARISON OF JOIN ALGORITHMS FOR LOG PROCESSING IN MAPREDUCE SIGMOD10
Equi-join between a log table L and a reference table R on a single column.
L,R and the Join Result is stored in DFS. Scans are used to access L and R. Each map or reduce task can optionally implement
two additional functions: init() and close() . These functions can be called before or after each
map or reduce task.
L ⊲⊳L.k=R.k R, with |L| ≫ |R|
REPARTITION JOIN(HIVE)
Drawback: all records may have to be buffered
Out of memory The key cardinality is small The data is highly skewed
L: Ratings.dat
R: movies.dat
Pairs: (key, targeted record)
shuffleinput map reduce output
1::1193::5::9783007601::661::3::9783021091::661::3::9783019681::661::4::9783002751 ::1193::5::97882429
661::James and the Glant…914::My Fair Lady..1193::One Flew Over the…2355::Bug’s Life, A…3408::Erin Brockovich…
1193, L:1::1193::5::978300760661, L :1::661::3::978302109661, L :1::661::3::978301968661, L :1::661::4::9783002751193, L :1 ::1193::5 ::97882429
661, R:661::James and the Gla…914, R: 914::My Fair Lady..1193, R: 1193::One Flew Over …2355, R: 2355::Bug’s Life, A…3408, R: 3408::Erin Brockovi…
(661, …)(661, …)(661, …)
(1193, …)(1193, …)
(661, …)(2355, …)(3048, …)(914, …)(1193, …)
(661,[L :1::661::3::97…],[R:661::James…],[L:1::661::3::978…],[L :1::661::4::97…])
(2355, [R:2355::B’…])(3408, [R:3408::Eri…])
(1,Ja..,3, …)(1,Ja..,3, …)(1,Ja..,4, …)
Group by join key
Buffers records into two sets according to the table tag
+Cross-product
{(661::James…) } X (1::661::3::97…), (1::661::3::97…), (1::661::4::97…)
Phase /Function Improvement
Map Function Output key is changed to a composite of the join key and the table tag.
Partitioning function Hashcode is computed from just the join key part of the composite key
Grouping function Records are grouped on just the join key
THE COST MEASURE FOR MR ALGORITHMS The communication cost of a process is the size
of the input to the process This paper does not count the output size for a
process The output must be input to at least one other process The final output is much smaller than its input
The total communication cost is the sum of the communication costs of all processes that constitute an algorithm
The elapsed communication cost is defined on the acyclic graph of processes Consider a path through this graph, and sum the
communication costs of the processes along that path The maximum sum, over all paths is the elapsed
communication cost
2-WAY JOIN IN MAPREDUCE R(A,B) S(B,C)
R
S
Input Reduce input
Final output
MapRe-duce
A B
a0 b0
a1 b1
a2 b2
… …
B C
b0 c0
b0 c1
b1 c2
… …
K V
b0 (a0, R)
b0 (c0, S)
b0 (c1, S)
… …K V
b1 (a1, R)
b1 (c2, S)
… …
A B C
a0 b0 c0
a0 b0 c1
a1 b1 c2
… … …
Table tuple map Partition& sort
R (a ,b ) b ->(a, R)
Hash(b) ->(a, R)
S (b , c )
b ->(c, S)
Hash(b) ->(c, S)
b->(a, c)
JOINING SEVERAL RELATIONS AT ONCE
R
S
Input Reduce input
Final output
Map Reduce
R(A,B) S(B,C) T(C,D)
T
JOINING SEVERAL RELATIONS AT ONCE
Let h be a hash function with range 1, 2, …, m S(b, c) -> (h(b),
h(c)) R(a, b) -> (h(b), all) T(c, d) -> (all, h(c))
Each Reduce process computes the join of the tuples it receives
(# of Reduce processes: 42 = 16)m=4, k=16
h(c) = 0 1 2 3
h(b) = 0
1
2
3
h(R.b) = 2
h(T.c) = 1h(S.b) = 2h(S.c) = 1
Reduce processes
R(A,B) S(B,C) T(C,D)
PROBLEM SOLVING
Problem solving using the method of Lagrange Multipliers
Take derivatives with respect to the three variables a, b, c
Multiply the three equations
SPECIAL CASES
Star Joins
Chain Joins A chain join is a join of the form
CONCLUSION
Just suitable for Equal join Use one map-reduce Does not consider the IO ( intermediate
<K,V> pairs IO ) and CPU time
Main contribution: use “Lagrangean multipliers” method