j oin algorithms using mapreduce haiping wang [email protected]
TRANSCRIPT
![Page 2: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/2.jpg)
OUTLINE
MapReduce Framework MapReduce implementation on Hadoop Join algorithms using MapReduce
![Page 3: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/3.jpg)
MAPREDUCE: SIMPLIFIED DATA PROCESSING ON LARGE CLUSTERS. IN OSDI, 2004
![Page 4: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/4.jpg)
MAPREDUCE WORDCOUNT DIAGRAM
ah ah er ah if or or uh or ah if
ah:1,1,1,1
ah:1 if:1 or:1 or:1 uh:1 or:1 ah:1 if:1
er:1 if:1,1or:1,1,1uh:1
ah:1 ah:1 er:1
4 1 2 3 1
file1 file2 file3 file4 file5 file6 file7
(ah) (er) (if) (or) (uh)
reduce(String outputkey, Iterator intermediate_alues):
map(String inputkey, String inputvalue):
![Page 5: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/5.jpg)
JobTracker
TaskTracker
Record Reader
Record Writer
MapperPartitioner Sorter
Reducer
Copy
InputFormatOutputFormat
MAPREDUCE IMPLEMENTATION ON HADOOP
![Page 6: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/6.jpg)
MAPREDUCE IMPLEMENTATION ON HADOOP
![Page 7: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/7.jpg)
HADOOP MAPREDUCE FRAMEWORK ARCHITECTURE
![Page 8: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/8.jpg)
JOIN ALGORITHMS USING MAPREDUCE
Map-Reduce-Merge: Simplified Relational Data Processing on Large Clusters sigmod07
Semi-join Computation on Distributed File Systems Using Map-Reduce-Merge Model Sac10
Optimizing joins in a map-reduce environment VLDB09,EDBT2010
A Comparison of Join Algorithms for Log Processing in MapReduce sigmod10
![Page 9: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/9.jpg)
MAP-REDUCE-MERGE: SIMPLIFIED RELATIONAL DATA PROCESSING ON LARGE CLUSTERS SIGMOD07
![Page 10: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/10.jpg)
MAP-REDUCE-MERGE IMPLEMENTATIONS OFRELATIONAL JOIN ALGORITHMS
Sort-merger join
Map range partitioner , ordered bucket s, each bucket a reducer
Reduce Read the designed buckets from all mappers and merged them into a sorted set
Merge Read sorted buckets from two data sets and do sort-merge join
Hash join Map Hash partitioner, hashed buckets, each bucket a reducer
Reduce Read the designed buckets from all mappers , use a hash table to group and aggregate these records(the same hash function as the mapper ), does not need a sorter
Merge In memory hash join
Block Nested loop join
Map The same as the hash join
Reduce The same as the hash join
Merge Nested loop join
![Page 11: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/11.jpg)
EXAMPLE: HASH JOIN
split split split split
mapper mapper mapper
reducer reducer reducer
merger merger merger
split split split split
mapper mapper mapper
reducer reducer reducer
Use a hash partitioner
Read from every mapper for one designated partition
• Read from two sets of reducer outputs that share the same hashing buckets
• One is used as a build set and the other probe
![Page 12: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/12.jpg)
ANALYSIS AND CONCLUSION
Connections A(ma, ra ), B(mb , rb ), r mergers suppose ra=rb=r Map->Reduce connections=
ra*ma+rb*mb=r*(ma+mb) Reduce->Merge in one-to-one case, connections=2r matcher: compare tuples to see id they should be
merged or not Conclusion
Use multiple map-reduce job Partitioner may cause data skew problem The number of ma, ra, mb, rb, r ra=rb? –>
connections
![Page 13: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/13.jpg)
SEMI-JOIN COMPUTATION STEPS AND WORKFLOW
Equal join reduce communication costs disk I/O costs Insensitive to data skew ?
![Page 14: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/14.jpg)
A COMPARISON OF JOIN ALGORITHMS FOR LOG PROCESSING IN MAPREDUCE SIGMOD10
Equi-join between a log table L and a reference table R on a single column.
L,R and the Join Result is stored in DFS. Scans are used to access L and R. Each map or reduce task can optionally implement
two additional functions: init() and close() . These functions can be called before or after each
map or reduce task.
L ⊲⊳L.k=R.k R, with |L| ≫ |R|
![Page 15: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/15.jpg)
REPARTITION JOIN(HIVE)
Drawback: all records may have to be buffered
Out of memory The key cardinality is small The data is highly skewed
L: Ratings.dat
R: movies.dat
Pairs: (key, targeted record)
shuffleinput map reduce output
1::1193::5::9783007601::661::3::9783021091::661::3::9783019681::661::4::9783002751 ::1193::5::97882429
661::James and the Glant…914::My Fair Lady..1193::One Flew Over the…2355::Bug’s Life, A…3408::Erin Brockovich…
1193, L:1::1193::5::978300760661, L :1::661::3::978302109661, L :1::661::3::978301968661, L :1::661::4::9783002751193, L :1 ::1193::5 ::97882429
661, R:661::James and the Gla…914, R: 914::My Fair Lady..1193, R: 1193::One Flew Over …2355, R: 2355::Bug’s Life, A…3408, R: 3408::Erin Brockovi…
(661, …)(661, …)(661, …)
(1193, …)(1193, …)
(661, …)(2355, …)(3048, …)(914, …)(1193, …)
(661,[L :1::661::3::97…],[R:661::James…],[L:1::661::3::978…],[L :1::661::4::97…])
(2355, [R:2355::B’…])(3408, [R:3408::Eri…])
(1,Ja..,3, …)(1,Ja..,3, …)(1,Ja..,4, …)
Group by join key
Buffers records into two sets according to the table tag
+Cross-product
{(661::James…) } X (1::661::3::97…), (1::661::3::97…), (1::661::4::97…)
Phase /Function Improvement
Map Function Output key is changed to a composite of the join key and the table tag.
Partitioning function Hashcode is computed from just the join key part of the composite key
Grouping function Records are grouped on just the join key
![Page 16: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/16.jpg)
THE COST MEASURE FOR MR ALGORITHMS The communication cost of a process is the size
of the input to the process This paper does not count the output size for a
process The output must be input to at least one other process The final output is much smaller than its input
The total communication cost is the sum of the communication costs of all processes that constitute an algorithm
The elapsed communication cost is defined on the acyclic graph of processes Consider a path through this graph, and sum the
communication costs of the processes along that path The maximum sum, over all paths is the elapsed
communication cost
![Page 17: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/17.jpg)
2-WAY JOIN IN MAPREDUCE R(A,B) S(B,C)
R
S
Input Reduce input
Final output
MapRe-duce
A B
a0 b0
a1 b1
a2 b2
… …
B C
b0 c0
b0 c1
b1 c2
… …
K V
b0 (a0, R)
b0 (c0, S)
b0 (c1, S)
… …K V
b1 (a1, R)
b1 (c2, S)
… …
A B C
a0 b0 c0
a0 b0 c1
a1 b1 c2
… … …
Table tuple map Partition& sort
R (a ,b ) b ->(a, R)
Hash(b) ->(a, R)
S (b , c )
b ->(c, S)
Hash(b) ->(c, S)
b->(a, c)
![Page 18: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/18.jpg)
JOINING SEVERAL RELATIONS AT ONCE
R
S
Input Reduce input
Final output
Map Reduce
R(A,B) S(B,C) T(C,D)
T
![Page 19: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/19.jpg)
JOINING SEVERAL RELATIONS AT ONCE
Let h be a hash function with range 1, 2, …, m S(b, c) -> (h(b),
h(c)) R(a, b) -> (h(b), all) T(c, d) -> (all, h(c))
Each Reduce process computes the join of the tuples it receives
(# of Reduce processes: 42 = 16)m=4, k=16
h(c) = 0 1 2 3
h(b) = 0
1
2
3
h(R.b) = 2
h(T.c) = 1h(S.b) = 2h(S.c) = 1
Reduce processes
R(A,B) S(B,C) T(C,D)
![Page 20: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/20.jpg)
PROBLEM SOLVING
Problem solving using the method of Lagrange Multipliers
Take derivatives with respect to the three variables a, b, c
Multiply the three equations
![Page 21: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/21.jpg)
SPECIAL CASES
Star Joins
Chain Joins A chain join is a join of the form
![Page 22: J OIN ALGORITHMS USING MAPREDUCE Haiping Wang ctqlwhp1022@163.com](https://reader031.vdocuments.us/reader031/viewer/2022013112/56649c7d5503460f94933120/html5/thumbnails/22.jpg)
CONCLUSION
Just suitable for Equal join Use one map-reduce Does not consider the IO ( intermediate
<K,V> pairs IO ) and CPU time
Main contribution: use “Lagrangean multipliers” method