topic map reduce
DESCRIPTION
Topic Map ReduceTRANSCRIPT
© Spinnaker Labs, Inc.
Data-Intensive Scalable Computing with MapReduce
CS290N, Spring 2010
© Spinnaker Labs, Inc.
Overview
• What is MapReduce?
• Related technologies
– GFS
– BigTable
• Technology Comparison
• MapReduce in a modern pipeline
© Spinnaker Labs, Inc.
Motivations for MapReduce
• Data processing: > 1 TB
• Massively parallel (hundreds or thousands of CPUs)
• Must be easy to use
© Spinnaker Labs, Inc.
How MapReduce is Structured
• Functional programming meets distributed computing
• A batch data processing system
• Factors out many reliability concerns from application logic
© Spinnaker Labs, Inc.
MapReduce Provides:
• Automatic parallelization & distribution
• Fault-tolerance
• Status and monitoring tools
• A clean abstraction for programmers
MapReduce: InsightMapReduce: Insight
• ”Consider the problem of counting the number of occurrences of each word in a large collection of documents”
• How would you do it in parallel ?
One possible solutionOne possible solution
MapReduce Programming ModelMapReduce Programming Model
• Inspired from map and reduce operations commonly
used in functional programming languages like Lisp.
•Users implement interface of two primary methods:– 1. Map: (key1, val1) → (key2, val2)– 2. Reduce: (key2, [val2]) → [val3]
Map operationMap operation
•Map, a pure function, written by the user, takes an input key/value pair and produces a set of intermediate key/value pairs.
– e.g. (doc—id, doc-content)
•Draw an analogy to SQL, map can be visualized as group-by clause of an aggregate query.
© Spinnaker Labs, Inc.
map (in_key, in_value) -> (out_key, intermediate_value) list
map
Reduce operationReduce operation
• On completion of map phase, all the intermediate values for a given output key are combined together into a list and given to a reducer.
• Can be visualized as aggregate function (e.g., average) that is computed over all the rows with the same group-by attribute.
© Spinnaker Labs, Inc.
reduce
reduce (out_key, intermediate_value list) ->out_value list
returned
initial
Pseudo-codePseudo-code
map(String input_key, String input_value):
// input_key: document name
// input_value: document contents
for each word w in input_value:
EmitIntermediate(w, "1");
reduce(String output_key, Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result));
© Spinnaker Labs, Inc.
Example vs. Actual Source Code
• Example is written in pseudo-code
• Actual implementation is in C++, using a MapReduce library
• Bindings for Python and Java exist via interfaces
• True code is somewhat more involved (defines how the input key/values are divided up and accessed, etc.)
MapReduce: Extensions and similar apps
• PIG (Yahoo)
• Hadoop (Apache)
• DryadLinq (Microsoft)
Large Scale Systems Architecture using MapReduce
MapReduce: Execution overview
MapReduce: Execution overview
MapReduce: Example
MapReduce in Parallel: Example
© 2010, Jamie Callan 21
MapReduce: Execution Details
• Input reader– Divide input into splits, assign each split to a Map processor
• Map– Apply the Map function to each record in the split– Each Map function returns a list of (key, value) pairs
• Shuffle/Partition and Sort– Shuffle distributes sorting & aggregation to many reducers– All records for key k are directed to the same reduce processor– Sort groups the same keys together, and prepares for aggregation
• Reduce– Apply the Reduce function to each key– The result of the Reduce function is a list of (key, value) pairs
MapReduce in One Picture
© 2010, Le Zhao22
Tom White, Hadoop: The Definitive Guide
MapReduce: Runtime Environment
MapReduce: Fault Tolerance
•Handled via re-execution of tasks.
Task completion committed through master
•What happens if Mapper fails ?– Re-execute completed + in-progress map tasks
•What happens if Reducer fails ?– Re-execute in progress reduce tasks
•What happens if Master fails ?– Potential trouble !!
MapReduce: Refinements MapReduce: Refinements Locality OptimizationLocality Optimization
•Leverage GFS to schedule a map task on a machine that contains a replica of the corresponding input data.
•Thousands of machines read input at local disk speed
•Without this, rack switches limit read rate
MapReduce: Refinements MapReduce: Refinements Redundant Redundant ExecutionExecution
• Slow workers are source of bottleneck, may delay completion time.
• Near end of phase, spawn backup tasks, one to finish first wins.
• Effectively utilizes computing power, reducing job completion time by a factor.
MapReduce: Refinements MapReduce: Refinements Skipping Bad Records Skipping Bad Records
• Map/Reduce functions sometimes fail for particular inputs.
• Fixing the Bug might not be possible : Third Party Libraries.
• On Error
– Worker sends signal to Master
– If multiple error on same record, skip record
MapReduce: Refinements MapReduce: Refinements Miscellaneous Miscellaneous
• Combiner Function at Mapper
• Sorting Guarantees within each reduce partition.
• Local execution for debugging/testing
• User-defined counters
© Spinnaker Labs, Inc.
Combining Phase
• Run on mapper nodes after map phase
• “Mini-reduce,” only on local map output
• Used to save bandwidth before sending data to full reducer
• Reducer can be combiner if commutative & associative
© Spinnaker Labs, Inc.
Combiner, graphically
Combiner replaces with:
Map output
To reducer
On one mapper machine:
To reducer
MapReduce: Some More Apps
• Distributed Grep.
• Count of URL Access Frequency.
• Clustering (K-means)
• Graph Algorithms.
• Indexing Systems
MapReduce Programs In Google Source Tree
More Applications with MapReduce
MapReduce Use Case (1) – Map Only
Data distributive tasks – Map Only
• E.g. classify individual documents
• Map does everything
– Input: (docno, doc_content), …
– Output: (docno, [class, class, …]), …
• No reduce
© 2010, Le Zhao33
MapReduce Use Case (2) – Filtering and Accumulation
Filtering & Accumulation – Map and Reduce
• E.g. Counting total enrollments of two given classes
• Map selects records and outputs initial counts
– In: (Jamie, 11741), (Tom, 11493), …
– Out: (11741, 1), (11493, 1), …
• Shuffle/Partition by class_id
• Sort
– In: (11741, 1), (11493, 1), (11741, 1), …
– Out: (11493, 1), …, (11741, 1), (11741, 1), …
• Reduce accumulates counts
– In: (11493, [1, 1, …]), (11741, [1, 1, …])
– Sum and Output: (11493, 16), (11741, 35)
© 2010, Le Zhao34
MapReduce Use Case (3) – Database Join
Problem: Massive lookups– Given two large lists: (URL, ID) and (URL, doc_content) pairs– Produce (ID, doc_content)
Solution: Database join• Input stream: both (URL, ID) and (URL, doc_content) lists
– (http://del.icio.us/post, 0), (http://digg.com/submit, 1), …– (http://del.icio.us/post, <html0>), (http://digg.com/submit, <html1>), …
• Map simply passes input along,• Shuffle and Sort on URL (group ID & doc_content for the same URL together)
– Out: (http://del.icio.us/post, 0), (http://del.icio.us/post, <html0>), (http://digg.com/submit, <html1>), (http://digg.com/submit, 1), …
• Reduce outputs result stream of (ID, doc_content) pairs– In: (http://del.icio.us/post, [0, html0]), (http://digg.com/submit, [html1, 1]), …– Out: (0, <html0>), (1, <html1>), …
© 2010, Le Zhao35
MapReduce Use Case (4) – Secondary Sort
Problem: Sorting on values• E.g. Reverse graph edge directions & output in node order
– Input: adjacency list of graph (3 nodes and 4 edges)(3, [1, 2]) (1, [3])(1, [2, 3]) (2, [1, 3]) (3, [1])
• Note, the node_ids in the output values are also sorted. But Hadoop only sorts on keys!
Solution: Secondary sort• Map
– In: (3, [1, 2]), (1, [2, 3]).– Intermediate: (1, [3]), (2, [3]), (2, [1]), (3, [1]). (reverse edge direction)– Out: (<1, 3>, [3]), (<2, 3>, [3]), (<2, 1>, [1]), (<3, 1>, [1]).– Copy node_ids from value to key.
1 2
3
1 2
3
© 2010, Le Zhao36
MapReduce Use Case (4) – Secondary Sort
Secondary Sort (ctd.)
• Shuffle on Key.field1, and Sort on whole Key (both fields)
– In: (<1, 3>, [3]), (<2, 3>, [3]), (<2, 1>, [1]), (<3, 1>, [1])
– Out: (<1, 3>, [3]), (<2, 1>, [1]), (<2, 3>, [3]), (<3, 1>, [1])
• Grouping comparator
– Merge according to part of the key
– Out: (<1, 3>, [3]), (<2, 1>, [1, 3]), (<3, 1>, [1]) this will be the reducer’s input
• Reduce
– Merge & output: (1, [3]), (2, [1, 3]), (3, [1])
© 2010, Le Zhao37
Using MapReduce to Construct Indexes:Preliminaries
Construction of binary inverted lists
• Input: documents: (docid, [term, term..]), (docid, [term, ..]), ..
• Output: (term, [docid, docid, …])
– E.g., (apple, [1, 23, 49, 127, …])
• Binary inverted lists fit on a slide more easily
• Everything also applies to frequency and positional inverted lists
A document id is an internal document id, e.g., a unique integer
• Not an external document id such as a url
MapReduce elements
• Combiner, Secondary Sort, complex keys, Sorting on keys’ fields
© 2010, Jamie Callan 38
Using MapReduce to Construct Indexes:A Simple Approach
A simple approach to creating binary inverted lists
• Each Map task is a document parser
– Input: A stream of documents
– Output: A stream of (term, docid) tuples
» (long, 1) (ago, 1) (and, 1) … (once, 2) (upon, 2) …
• Shuffle sorts tuples by key and routes tuples to Reducers
• Reducers convert streams of keys into streams of inverted lists
– Input: (long, 1) (long, 127) (long, 49) (long, 23) …
– The reducer sorts the values for a key and builds an inverted list
» Longest inverted list must fit in memory
– Output: (long, [df:492, docids:1, 23, 49, 127, …])
© 2010, Jamie Callan 39
© Spinnaker Labs, Inc.
Inverted Index: Data flow
This page contains so much text
My page contains text too
Foo
Bar
contains: BarMy: Barpage : Bartext: Bartoo: Bar
contains: Foomuch: Foopage : Fooso : Footext: FooThis : Foo
contains: Foo, Barmuch: FooMy: Barpage : Foo, Barso : Footext: Foo, BarThis : Footoo: Bar
Reduced output
Foo map output
Bar map output
Using MapReduce to Construct Indexes:A Simple Approach
A more succinct representation of the previous algorithm
• Map: (docid1, content1) (t1, docid1) (t2, docid1) …
• Shuffle by t
• Sort by t
(t5, docid1) (t4, docid3) … (t4, docid3) (t4, docid1) (t5, docid1) …
• Reduce: (t4, [docid3 docid1 …]) (t, ilist)
docid: a unique integer
t: a term, e.g., “apple”
ilist: a complete inverted list
but a) inefficient, b) docids are sorted in reducers, and c) assumes ilist of a word fits in memory
© 2010, Jamie Callan 41
Using MapReduce to Construct Indexes:Using Combine
• Map: (docid1, content1) (t1, ilist1,1) (t2, ilist2,1) (t3, ilist3,1) …
– Each output inverted list covers just one document
• Combine
Sort by t
Combine: (t1 [ilist1,2 ilist1,3 ilist1,1 …]) (t1, ilist1,27)
– Each output inverted list covers a sequence of documents
• Shuffle by t
• Sort by t
(t4, ilist4,1) (t5, ilist5,3) … (t4, ilist4,2) (t4, ilist4,4) (t4, ilist4,1) …
• Reduce: (t7, [ilist7,2, ilist3,1, ilist7,4, …]) (t7, ilistfinal)
ilisti,j: the j’th inverted list fragment for term i
© 2010, Jamie Callan 42
© 2010, Jamie Callan 4343
Using MapReduce to Construct Indexes
Parser /Indexer
Parser /Indexer
Parser /Indexer
:
::
:
:
:
Merger
Merger
Merger
::
A-F
DocumentsInverted
Lists
Map/Combine
ProcessorsInverted ListFragments Processors
Shuffle/Sort Reduce
G-P
Q-Z
Using MapReduce to ConstructPartitioned Indexes
• Map: (docid1, content1) ([p, t1], ilist1,1)
• Combine to sort and group values
([p, t1] [ilist1,2 ilist1,3 ilist1,1 …]) ([p, t1], ilist1,27)
• Shuffle by p
• Sort values by [p, t]
• Reduce: ([p, t7], [ilist7,2, ilist7,1, ilist7,4, …]) ([p, t7], ilistfinal)
p: partition (shard) id
© 2010, Jamie Callan 44
Using MapReduce to Construct Indexes:Secondary Sort
So far, we have assumed that Reduce can sort values in memory …but what if there are too many to fit in memory?
• Map: (docid1, content1) ([t1, fd1,1], ilist1,1)
• Combine to sort and group values
• Shuffle by t
• Sort by [t, fd], then Group by t (Secondary Sort)
([t7, fd7,2], ilist7,2), ([t7, fd7,1], ilist7,1) … (t7, [ilist7,1, ilist7,2, …])
• Reduce: (t7, [ilist7,1, ilist7,2, …]) (t7, ilistfinal)
Values arrive in order, so Reduce can stream its output
fdi,j is the first docid in ilisti,j
© 2010, Jamie Callan 45
Using MapReduce to Construct Indexes:Putting it All Together
• Map: (docid1, content1) ([p, t1, fd1,1], ilist1,1)
• Combine to sort and group values
([p, t1, fd1,1] [ilist1,2 ilist1,3 ilist1,1 …]) ([p, t1, fd1,27], ilist1,27)
• Shuffle by p
• Secondary Sort by [(p, t), fd]
([p, t7], [ilist7,2, ilist7,1, ilist7,4, …]) ([p, t7], [ilist7,1, ilist7,2, ilist7,4, …])
• Reduce: ([p, t7], [ilist7,1, ilist7,2, ilist7,4, …]) ([p, t7], ilistfinal)
© 2010, Jamie Callan 46
© 2010, Jamie Callan 4747
Using MapReduce to Construct Indexes
Parser /Indexer
Parser /Indexer
Parser /Indexer
:
::
:
:
:
Merger
Merger
Merger
::
Shard
DocumentsInverted
Lists
Map/Combine
ProcessorsInverted ListFragments Processors
Shuffle/Sort Reduce
Shard
Shard
MapReduce : PageRankMapReduce : PageRank
PageRank models the behavior of a “random surfer”.
C(t) is the out-degree of t, and (1-d) is a damping factor (random jump)
The “random surfer” keeps clicking on successive links at random not taking content into consideration.
Distributes its pages rank equally among all pages it links to.
The dampening factor takes the surfer “getting bored” and typing arbitrary URL.
n
i i
i
tC
tPRddxPR
1 )(
)()1()(
Computing PageRank
PageRank : Key Insights
Effects at each iteration is local. i+1th iteration depends only on ith iteration
At iteration i, PageRank for individual nodes can be computed independently
PageRank using MapReduce
• Use Sparse matrix representation (M)
•Map each row of M to a list of PageRank “credit” to assign to out link neighbours.
• These prestige scores are reduced to a single PageRank value for a page by aggregating over them.
PageRank using MapReduce
Map: distribute PageRank “credit” to link targets
Reduce: gather up PageRank “credit” from multiple sources to compute new PageRank value
Iterate untilconvergence
Source of Image: Lin 2008
Phase 1: Phase 1: Process HTMLProcess HTML • Map task takes (URL, page-content) pairs and maps them to (URL,
(PRinit
, list-of-urls))
– PRinit
is the “seed” PageRank for URL– list-of-urls contains all pages pointed to by URL
• Reduce task is just the identity function
Phase 2: Phase 2: PageRank DistributionPageRank Distribution • Reduce task gets (URL, url_list) and many (URL, val) values
– Sum vals and fix up with d to get new PR– Emit (URL, (new_rank, url_list))
• Check for convergence using non parallel component
PageRank Calculation:Preliminaries
One PageRank iteration:
• Input:
– (id1, [score1(t), out11, out12, ..]), (id2, [score2
(t), out21, out22, ..]) ..
• Output:
– (id1, [score1(t+1), out11, out12, ..]), (id2, [score2
(t+1), out21, out22, ..]) ..
MapReduce elements
• Score distribution and accumulation
• Database join
• Side-effect files
© 2010, Jamie Callan 56
PageRank: Score Distribution and Accumulation
• Map
– In: (id1, [score1(t), out11, out12, ..]), (id2, [score2
(t), out21, out22, ..]) ..
– Out: (out11, score1(t)/n1), (out12, score1
(t)/n1) .., (out21, score2(t)/n2), ..
• Shuffle & Sort by node_id
– In: (id2, score1), (id1, score2), (id1, score1), ..
– Out: (id1, score1), (id1, score2), .., (id2, score1), ..
• Reduce
– In: (id1, [score1, score2, ..]), (id2, [score1, ..]), ..
– Out: (id1, score1(t+1)), (id2, score2
(t+1)), ..
© 2010, Jamie Callan 57
PageRank: Database Join to associate outlinks with score
• Map
– In & Out: (id1, score1(t+1)), (id2, score2
(t+1)), .., (id1, [out11, out12, ..]), (id2, [out21, out22, ..]) ..
• Shuffle & Sort by node_id
– Out: (id1, score1(t+1)), (id1, [out11, out12, ..]), (id2, [out21, out22, ..]), (id2,
score2(t+1)), ..
• Reduce
– In: (id1, [score1(t+1), out11, out12, ..]), (id2, [out21, out22, .., score2
(t+1)]), ..
– Out: (id1, [score1(t+1), out11, out12, ..]), (id2, [score2
(t+1), out21, out22, ..]) ..
© 2010, Jamie Callan 58
PageRank: Side Effect Files for dangling nodes
• Dangling Nodes
– Nodes with no outlinks (observed but not crawled URLs)
– Score has no outlet
» need to distribute to all graph nodes evenly
• Map for dangling nodes:
– In: .., (id3, [score3]), ..
– Out: .., ("*", 0.85×score3), ..
• Reduce
– In: .., ("*", [score1, score2, ..]), ..
– Out: .., everything else, ..
– Output to side-effect: ("*", score), fed to Mapper of next iteration
© 2010, Jamie Callan 59
Manipulating Large Data
• Do everything in Hadoop (and HDFS)
– Make sure every step is parallelized!
– Any serial step breaks your design
• E.g. storing the URL list for a Web graph
– Each node in Web graph has an id
– [URL1, URL2, …], use line number as id – bottle neck
– [(id1, URL1), (id2, URL2), …], explicit id
© 2010, Le Zhao60
Hadoop based Tools
• For Developing in Java, NetBeans plugin
– http://www.hadoopstudio.org/docs.html
• Pig Latin, a SQL-like high level data processing script language
• Hive, Data warehouse, SQL
• Cascading, Data processing
• Mahout, Machine Learning algorithms on Hadoop
• HBase, Distributed data store as a large table
• More
– http://hadoop.apache.org/
– http://en.wikipedia.org/wiki/Hadoop
– Many other toolkits, Nutch, Cloud9, Ivory
© 2010, Le Zhao61
Get Your Hands Dirty
• Hadoop Virtual Machine
– http://www.cloudera.com/developers/downloads/virtual-machine/
» This runs Hadoop 0.20
– An earlier Hadoop 0.18.0 version is here http://code.google.com/edu/parallel/tools/hadoopvm/index.html
• Amazon EC2
• Various other Hadoop clusters around
• The NetBeans plugin simulates Hadoop
– The workflow view works on Windows
– Local running & debugging works on MacOS and Linux
– http://www.hadoopstudio.org/docs.html
© 2010, Le Zhao62
Conclusions
• Why large scale
• MapReduce advantages
• Hadoop uses
• Use cases
– Map only: for totally distributive computation
– Map+Reduce: for filtering & aggregation
– Database join: for massive dictionary lookups
– Secondary sort: for sorting on values
– Inverted indexing: combiner, complex keys
– PageRank: side effect files
• Large data
© 2010, Jamie Callan 63
© 2010, Jamie Callan 64
For More Information
• L. A. Barroso, J. Dean, and U. Hölzle. “Web search for a planet: The Google cluster architecture.” IEEE Micro, 2003.
• J. Dean and S. Ghemawat. “MapReduce: Simplified Data Processing on Large Clusters.” Proceedings of the 6th Symposium on Operating System Design and Implementation (OSDI 2004), pages 137-150. 2004.
• S. Ghemawat, H. Gobioff, and S.-T. Leung. “The Google File System.” Proceedings of the 19th ACM Symposium on Operating Systems Principles (SOSP-03), pages 29-43. 2003.
• I.H. Witten, A. Moffat, and T.C. Bell. Managing Gigabytes. Morgan Kaufmann. 1999.
• J. Zobel and A. Moffat. “Inverted files for text search engines.” ACM Computing Surveys, 38 (2). 2006.
• http://hadoop.apache.org/common/docs/current/mapred_tutorial.html. “Map/Reduce Tutorial”. Fetched January 21, 2010.
• Tom White. Hadoop: The Definitive Guide. O'Reilly Media. June 5, 2009
• J. Lin and C. Dyer. Data-Intensive Text Processing with MapReduce, Book Draft. February 7, 2010.