05/16/2012map and reduce1 operating system ecs251 spring 2012 : operating system #4: map/reduce,...
TRANSCRIPT
05/16/2012 Map and Reduce 1
ecs251 Spring 2012:Operating SystemOperating System#4: Map/Reduce, HDFS, Applications
Dr. S. Felix Wu
Computer Science Department
University of California, Davis
05/16/2012 Map and Reduce 2
05/16/2012 Map and Reduce 3
05/16/2012 Map and Reduce 4
05/16/2012 Map and Reduce 5
05/16/2012 Map and Reduce 6
05/16/2012 Map and Reduce 7
05/16/2012 Map and Reduce 8
05/16/2012 Map and Reduce 9
05/16/2012 Map and Reduce 10
05/16/2012 Map and Reduce 11
05/16/2012 Map and Reduce 12
UserProgram
Worker
Worker
Master
Worker
Worker
Worker
fork fork fork
assignmap
assignreduce
readlocalwrite
remoteread,sort
OutputFile 0
OutputFile 1
write
Split 0Split 1Split 2
input data fromdistributed filesystem
05/16/2012 Map and Reduce 13
MapReduce MapReduce Programming ModelProgramming Model
Data type: key-value records
Map function:(Kin, Vin) list(Kinter, Vinter)
Reduce function:(Kinter, list(Vinter)) list(Kout, Vout)
Map ReduceMap Reduce
Parallelism Save Network Bandwidth, and Data
Locality Failure and Transparency Simple Programming Model
05/16/2012 Map and Reduce 15
Example: Word Example: Word CountCount
def mapper(line): foreach word in line.split(): output(word, 1)
def reducer(key, values): output(key, sum(values))
Word Count Word Count ExecutionExecution
the quickbrown
fox
the fox ate the mouse
how now
brown cow
MapMap
MapMap
MapMap
ReduceReduce
ReduceReduce
brown, 2
fox, 2how, 1now, 1the, 3
ate, 1cow, 1mouse,
1quick, 1
the, 1brown, 1
fox, 1
quick, 1
the, 1fox, 1the, 1
how, 1now, 1
brown, 1ate, 1
mouse, 1
cow, 1
Input Map Shuffle & Sort Reduce Output
An Optimization: The An Optimization: The CombinerCombiner
Local reduce function for repeated keys produced by same map
For associative ops. like sum, count, max
Decreases amount of intermediate data
Example: local counting for Word Count:
def combiner(key, values): output(key, sum(values))
Word Count with Word Count with CombinerCombiner
the quickbrown
fox
the fox ate the mouse
how now
brown cow
MapMap
MapMap
MapMap
ReduceReduce
ReduceReduce
brown, 2
fox, 2how, 1now, 1the, 3
ate, 1cow, 1mouse,
1quick, 1
the, 1brown, 1
fox, 1
quick, 1
the, 2fox, 1
how, 1now, 1
brown, 1ate, 1
mouse, 1
cow, 1
Input Map Shuffle & Sort Reduce Output
MapReduce MapReduce Execution DetailsExecution Details
Mappers preferentially scheduled on same node or same rack as their input block– Minimize network use to improve performance
Mappers save outputs to local disk before serving to reducers– Allows recovery if a reducer crashes– Allows running more reducers than # of nodes
Fault Tolerance in Fault Tolerance in MapReduceMapReduce
1. If a task crashes:– Retry on another node
OK for a map because it had no dependencies
OK for reduce because map outputs are on disk
– If the same task repeatedly fails, fail the job or ignore that input block
Note: For the fault tolerance to work, user tasks must be deterministic and side-effect-free
Fault Tolerance in Fault Tolerance in MapReduceMapReduce
2. If a node crashes:– Relaunch its current tasks on other
nodes– Relaunch any maps the node
previously ran Necessary because their output files
were lost along with the crashed node
Fault Tolerance in Fault Tolerance in MapReduceMapReduce
3. If a task is going slowly (straggler):– Launch second copy of task on another
node– Take the output of whichever copy
finishes first, and kill the other one
Critical for performance in large clusters (many possible causes of stragglers)
TakeawaysTakeaways
By providing a restricted data-parallel programming model, MapReduce can control job execution in useful ways:– Automatic division of job into tasks– Placement of computation near data– Load balancing– Recovery from failures & stragglers
05/16/2012 Map and Reduce 25
05/16/2012 Map and Reduce 26
Amazon Elastic Amazon Elastic MapReduceMapReduce
Web interface and command-line tools for running Hadoop jobs on EC2
Data stored in Amazon S3 Monitors job and shuts machines
after use
Elastic MapReduce Elastic MapReduce UIUI
Elastic MapReduce Elastic MapReduce UIUI
05/16/2012 Map and Reduce 30
Map: A Higher Order Map: A Higher Order FunctionFunction
F(x: int) returns r: int Let V be an array of integers. W = map(F, V)
– W[i] = F(V[i]) for all I– i.e., apply F to every element of V
05/16/2012 Map and Reduce 31
Map Examples in Map Examples in HaskellHaskell
map (+1) [1,2,3,4,5]== [2, 3, 4, 5, 6]
map (toLower) "abcDEFG12!@#“== "abcdefg12!@#“
map (`mod` 3) [1..10]== [1, 2, 0, 1, 2, 0, 1, 2, 0, 1]
05/16/2012 Map and Reduce 32
Word Count ExampleWord Count Example
Read text files and count how often words occur. – The input is text files– The output is a text file
each line: word, tab, count
Map: Produce pairs of (word, count) Reduce: For each word, sum up the counts.
05/16/2012 Map and Reduce 33
v2k2
k v
k vmap
v1k1
vnkn
…
k vmap
Inputkey-value pairs
Intermediatekey-value pairs
…
k v
E.g. (doc—id, doc-content) E.g. (word, wordcount-in-a-doc)
05/16/2012 Map and Reduce 34
k v
…
k v
k v
k v
Intermediatekey-value pairs
group
reduce
reducek v
k v
k v
…
k v
…
k v
k v v
v v
Key-value groups
Output key-value pairs
E.g. (word, wordcount-in-a-doc)
(word, list-of-wordcount) (word, final-count)~ SQL Group by ~ SQL aggregation
05/16/2012 Map and Reduce 35
I am a tiger, you are also a tiger a,2 also,1 am,1 are,1 I,1 tiger,2 you,1
I,1 am,1 a,1
tiger,1 you,1 are,1
also,1 a, 1 tiger,1
a,2also,1am,1 are,1
I, 1 tiger,2 you,1
reduce
reduce
map
map
map
a, 1 a,1 also,1 am,1 are,1 I,1 tiger,1 tiger,1 you,1
MapReduce : ExecutionMapReduce : Execution
3605/16/2012 Map and Reduce
05/16/2012 Map and Reduce 37
Inverted Index ExampleInverted Index Example
Generate an inverted index of words from a given set of files
Map: parses a document and emits <word, docId> pairs
Reduce: takes all pairs for a given word, sorts the docId values, and emits a <word, list(docId)> pair
05/16/2012 Map and Reduce 38
{"id":"204722549606084_230354117042927","from":{"name":"Rufino Beniga","id":"100000037203591"},"to":{"data":[{"version":1,"name":"ecs30 Programming and Problem Solving","id":"204722549606084"}]},"message":"i'm at the \"help Command: man\" part and i typed \"man ls\" and it listed a bunch of stuff about what it is, how do i get out of it? it says i need to type exit but nothing is happening","actions":[{"name":"Comment","link":"http:\/\/www.facebook.com\/204722549606084\/posts\/230354117042927"},{"name":"Like","link":"http:\/\/www.facebook.com\/204722549606084\/posts\/230354117042927"}],"type":"status","created_time":"2012-01-11T05:48:09+0000","updated_time":"2012-01-11T05:49:17+0000","comments":{"data":[{"id":"204722549606084_230354117042927_230354333709572","from":{"name":"Chris Schwarz","id":"5100058"},"message":"Q","created_time":"2012-01-11T05:48:50+0000","likes":1},{"id":"204722549606084_230354117042927_230354447042894","from":{"name":"Rufino Beniga","id":"100000037203591"},"message":"haha i forgot about that. Thanks!!!! :D","created_time":"2012-01-11T05:49:17+0000"},{"id":"204722549606084_230354117042927_230354443709561","from":{"name":"Connor Wilson","id":"1596499591"},"message":"try ^c? lol","created_time":"2012-01-11T05:49:17+0000"}],"count":3}}
05/16/2012 Map and Reduce 39
{"id":"204722549606084_230354117042927","from":{"name":"Rufino Beniga","id":"100000037203591"},"to":{"data":[{"version":1,"name":"ecs30 Programming and Problem Solving","id":"204722549606084"}]},"message":"i'm at the \"help Command: man\" part and i typed \"man ls\" and it listed a bunch of stuff about what it is, how do i get out of it? it says i need to type exit but nothing is happening","actions":[{"name":"Comment","link":"http:\/\/www.facebook.com\/204722549606084\/posts\/230354117042927"},{"name":"Like","link":"http:\/\/www.facebook.com\/204722549606084\/posts\/230354117042927"}],"type":"status","created_time":"2012-01-11T05:48:09+0000","updated_time":"2012-01-11T05:49:17+0000","comments":{"data":[{"id":"204722549606084_230354117042927_230354333709572","from":{"name":"Chris Schwarz","id":"5100058"},"message":"Q","created_time":"2012-01-11T05:48:50+0000","likes":1},{"id":"204722549606084_230354117042927_230354447042894","from":{"name":"Rufino Beniga","id":"100000037203591"},"message":"haha i forgot about that. Thanks!!!! :D","created_time":"2012-01-11T05:49:17+0000"},{"id":"204722549606084_230354117042927_230354443709561","from":{"name":"Connor Wilson","id":"1596499591"},"message":"try ^c? lol","created_time":"2012-01-11T05:49:17+0000"}],"count":3}}
05/16/2012 Map and Reduce 40
05/16/2012 Map and Reduce 41
SecondaryNameNode
Client
HDFS Architecture
NameNode
DataNodes
1. filename
2. BlckId, DataNodes
o
3.Read data
Cluster Membership
Cluster Membership
NameNode : Maps a file to a file-id and list of MapNodesDataNode : Maps a block-id to a physical location on diskSecondaryNameNode: Periodic merge of Transaction log
Hadoop Distributed Hadoop Distributed File SystemFile System
Files split into 128MB blocks
Blocks replicated across several datanodes (often 3)
Namenode stores metadata (file names, locations, etc)
Optimized for large files, sequential reads
Files are append-only
Namenode
Datanodes
11223344
112244
221133
114433
332244
File1
05/16/2012 Map and Reduce 43
05/16/2012 Map and Reduce 44
HDFS ArchitectureHDFS Architecture
04/21/23 45
Namenode
Breplication
Rack1 Rack2
Client
Blocks
Datanodes Datanodes
Client
Write
Read
Metadata opsMetadata(Name, replicas..)(/home/foo/data,6. ..
Block ops
NameNode MetadataNameNode Metadata
Meta-data in Memory– The entire metadata is in main memory– No demand paging of meta-data
Types of Metadata– List of files– List of Blocks for each file– List of DataNodes for each block– File attributes, e.g creation time, replication factor
A Transaction Log– Records file creations, file deletions. etc
DataNodeDataNode
A Block Server– Stores data in the local file system (e.g. ext3)– Stores meta-data of a block (e.g. CRC)– Serves data and meta-data to Clients
Block Report– Periodically sends a report of all existing blocks to the NameNode
Facilitates Pipelining of Data– Forwards data to other specified DataNodes
Block PlacementBlock Placement
Current Strategy
-- One replica on local node
-- Second replica on a remote rack
-- Third replica on same remote rack
-- Additional replicas are randomly placed Clients read from nearest replica Would like to make this policy pluggable
Data CorrectnessData Correctness
Use Checksums to validate data
– Use CRC32 File Creation
– Client computes checksum per 512 byte
– DataNode stores the checksum File access
– Client retrieves the data and checksum from DataNode
– If Validation fails, Client tries other replicas
NameNode FailureNameNode Failure
A single point of failure Transaction Log stored in multiple directories
– A directory on the local file system
– A directory on a remote file system (NFS/CIFS) Need to develop a real HA solution
Data PipeliningData Pipelining
Client retrieves a list of DataNodes on which to place replicas of a block
Client writes block to the first DataNode The first DataNode forwards the data to the next
DataNode in the Pipeline When all replicas are written, the Client moves on to
write the next block in file
RebalancerRebalancer
Goal: % disk full on DataNodes should be similar– Usually run when new DataNodes are added– Cluster is online when Rebalancer is active– Rebalancer is throttled to avoid network congestion– Command line tool
Hadoop Map/ReduceHadoop Map/Reduce
The Map-Reduce programming model– Framework for distributed processing of large data sets– Pluggable user code runs in generic framework
Common design pattern in data processing cat * | grep | sort | unique -c | cat > file
input | map | shuffle | reduce | output Natural for:
– Log processing – Web search indexing – Ad-hoc queries
MotivationMotivation
MapReduce is powerful: many algorithmscan be expressed as a series of MR jobs
But it’s fairly low-level: must think about keys, values, partitioning, etc.
Can we capture common “job patterns”?
PigPig
Started at Yahoo! Research Runs about 50% of Yahoo!’s jobs Features:
– Expresses sequences of MapReduce jobs– Data model: nested “bags” of items– Provides relational (SQL) operators
(JOIN, GROUP BY, etc)– Easy to plug in Java functions
An Example ProblemAn Example Problem
Suppose you have user data in one file, website data in another, and you need to find the top 5 most visited pages by users aged 18-25.
Load Users Load Pages
Filter by age
Join on name
Group on url
Count clicks
Order by clicks
Take top 5
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
In MapReduceIn MapReduce
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Users = load ‘users’ as (name, age);Filtered = filter Users by age >= 18 and age <= 25; Pages = load ‘pages’ as (user, url);Joined = join Filtered by name, Pages by user;Grouped = group Joined by url;Summed = foreach Grouped generate group, count(Joined) as clicks;Sorted = order Summed by clicks desc;Top5 = limit Sorted 5;
store Top5 into ‘top5sites’;
In Pig LatinIn Pig Latin
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Translation to Translation to MapReduceMapReduce
Notice how naturally the components of the job translate into Pig Latin.
Load Users Load Pages
Filter by age
Join on name
Group on url
Count clicks
Order by clicks
Take top 5
Users = load …Filtered = filter …
Pages = load …Joined = join …Grouped = group …Summed = … count()…Sorted = order …Top5 = limit …
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Translation to Translation to MapReduceMapReduce
Notice how naturally the components of the job translate into Pig Latin.
Load Users Load Pages
Filter by age
Join on name
Group on url
Count clicks
Order by clicks
Take top 5
Users = load …Filtered = filter …
Pages = load …Joined = join …Grouped = group …Summed = … count()…Sorted = order …Top5 = limit …
Job 1
Job 2
Job 3
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
HiveHive Developed at Facebook Used for most Facebook jobs Relational database built on
Hadoop– Maintains table schemas– SQL-like query language (which can
also call Hadoop Streaming scripts)– Supports table partitioning,
complex data types, sampling,some query optimization
04/21/23 62
Data modelData model
Hive structures data into well-understood database concepts such as: tables, rows, cols, partitions
It supports primitive types: integers, floats, doubles, and strings
Hive also supports: – associative arrays: map<key-type, value-type>– Lists: list<element type>– Structs: struct<file name: file type…>
SerDe: serialize and deserialized API is used to move data in and out of tables
04/21/23 63
Query Language (HiveQL)Query Language (HiveQL)
Subset of SQL Meta-data queries Limited equality and join predicates No inserts on existing tables (to preserve
worm property)– Can overwrite an entire table
04/21/23 64
Wordcount in HiveWordcount in Hive
FROM (MAP doctext USING 'python wc_mapper.py' AS
(word, cnt)FROM docsCLUSTER BY word) aREDUCE word, cnt USING
'pythonwc_reduce.py';04/21/23 65
Session/tmstamp exampleSession/tmstamp example
FROM (FROM session_tableSELECT sessionid, tstamp, dataDISTRIBUTE BY sessionid SORT BY tstamp) aREDUCE sessionid, tstamp, data USING
'session_reducer.sh';
04/21/23 66
Data StorageData Storage
Tables are logical data units; table metadata associates the data in the table to hdfs directories.
Hdfs namespace: tables (hdfs directory), partition (hdfs subdirectory), buckets (subdirectories within partition)
/user/hive/warehouse/test_table is a hdfs directory
04/21/23 67
05/16/2012 Map and Reduce 68
Hadoop Usage at Hadoop Usage at FacebookFacebook
Data warehouse running Hive 600 machines, 4800 cores, 2.4 PB disk 3200 jobs per day 50+ engineers have used Hadoop
05/16/2012 Map and Reduce 69
Facebook Data PipelineFacebook Data Pipeline
Web Servers Scribe Servers
Network Storage
Hadoop Cluster
Oracle RACMySQL
Analysts
Hive Queries
Summaries
05/16/2012 Map and Reduce 70
Facebook Job TypesFacebook Job Types
Production jobs: load data, compute statistics, detect spam, etc
Long experiments: machine learning, etc Small ad-hoc queries: Hive jobs, sampling
GOAL: Provide fast response times for small jobs and guaranteed service levels for production jobs
GOAL: Provide fast response times for small jobs and guaranteed service levels for production jobs
MotivationMotivation
71Introduction to Hive04/21/23
Web ServersScribe Writers
RealtimeHadoop Cluster
Hadoop Hive WarehouseOracle RAC MySQL
Scribe MidTier
http://hadoopblog.blogspot.com/2009/06/hdfs-scribe-integration.html
72
HDFS
Map Reduce
Web UI + Hive CLI + JDBC/ODBC
Browse, Query, DDL
Hive QL
Parser
Planner
Optimizer
Execution
SerDe
CSVThriftRegex
UDF/UDAF
substrsum
average
FileFormats
TextFileSequenceFile
RCFile
User-definedMap-reduce Scripts
ArchitectureArchitecture
http://www.slideshare.net/cloudera/hw09-hadoop-development-at-facebook-hive-and-hdfs
Spark MotivationSpark Motivation
MapReduce simplified “big data” analysis on large, unreliable clusters
But as soon as organizations started using it widely, users wanted more:– More complex, multi-stage applications– More interactive queries– More low-latency online processing
Spark MotivationSpark Motivation
Complex jobs, interactive queries and online processing all need one thing that MR lacks:
Efficient primitives for data sharing
Iterative job
Query 1Query 1
Query 2Query 2
Query 3Query 3
Interactive mining
…
Stream processing
Spark MotivationSpark Motivation
Complex jobs, interactive queries and online processing all need one thing that MR lacks:
Efficient primitives for data sharing
Iterative job
Query 1Query 1
Query 2Query 2
Query 3Query 3
Interactive mining
…
Stream processing
Problem: in MR, only way to share data across jobs is stable storage (e.g. file system) -> slow!
ExamplesExamples
iter. 1iter. 1 iter. 2iter. 2 . . .
Input
HDFSread
HDFSwrite
HDFSread
HDFSwrite
Input
query 1query 1
query 2query 2
query 3query 3
result 1
result 2
result 3
. . .
HDFSread
iter. 1iter. 1 iter. 2iter. 2 . . .
Input
Goal: In-Memory Data Goal: In-Memory Data SharingSharing
Distributedmemory
Input
query 1query 1
query 2query 2
query 3query 3
. . .
one-timeprocessing
10-100× faster than network and disk
05/16/2012 Map and Reduce 78
Map and ReduceMap and Reduce
The idea of Map, and Reduce is 40+ year old– Present in all Functional Programming Languages.
– See, e.g., APL, Lisp and ML
Alternate names for Map: Apply-All Higher Order Functions
– take function definitions as arguments, or
– return a function as output
Map and Reduce are higher-order functions.
05/16/2012 Map and Reduce 79
Phase SynchronizationPhase Synchronization
When shall we start to reduce?– The output of Reduce could be the input for
another higher-level Map! How about failures? Can we (or should we) have “adaptive and
incremental” map and reduce? How about “atomicity”?
– a M/R transaction?
05/16/2012 Map and Reduce 80
05/16/2012 Map and Reduce 81
Class MR{Class Mapper …{ }Class Reducer …{ }main(){
JobConf conf = new JobConf(“MR.class”);conf.setMapperClass(Mapper.class);conf.setReduceClass(Reducer.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new
Path(args[1]));
JobClient.runJob(conf);}}
Map function
Reduce function
Other parts of program
Map
Reduce
Config
05/16/2012 Map and Reduce 82 82
class MyMap extends MapReduceBase implements Mapper < , , , > {// global variables
public void map ( key, value, OutputCollector< , > output,
Reporter reporter) throws IOException {// local variables and programoutput.collect( NewKey, NewValue);}
}
1
234
56789
INPUT KEY
INPUT VALUE
OUTPUT VALUE
OUTPUT KEY
INPUT KEY
INPUT VALUE
OUTPUT VALUE
OUTPUT KEY
05/16/2012 Map and Reduce 83
class MyRed extends MapReduceBase implements Reducer < , , , > {// global variables
public void reduce ( key, Iterator< > values, OutputCollector< , > output,
Reporter reporter) throws IOException {// local variables and programoutput.collect( NewKey, NewValue);}
}
1
234
56789
INPUT KEY
INPUT VALUE
OUTPUT VALUE
OUTPUT KEY
INPUT KEY
INPUT VALUE
OUTPUT VALUE
OUTPUT KEY
Applicability of M/R?Applicability of M/R?
05/16/2012 Map and Reduce 84
Trade-offTrade-off
Map all the inputs Reduce to aggregate
Filter/pre-process/abstract the inputs then Map/Reduce– Remove all the duplicates first– Or, shall we just map it to different chunk
servers, then remove the duplicates?– Application-dependent!!
05/16/2012 Map and Reduce 85
Stability versus EfficiencyStability versus Efficiency
Asynchronous “phasing” for adaptive/append-dominated inputs
– When shall we trigger the inputs to the next phase of Map and Reduce?
05/16/2012 Map and Reduce 86
ConsistencyConsistency
All or Nothing
Epsilon-Consistency– Tolerating the amount of “inconsistency”
05/16/2012 Map and Reduce 87
05/16/2012 Map and Reduce 88
05/16/2012 Map and Reduce 89
05/16/2012 Map and Reduce 90
05/16/2012 Map and Reduce 91
05/16/2012 Map and Reduce 92
• Complete web search engine– Nutch = Crawler + Indexer/Searcher (Lucene)
+ GUI » +Plugins» +MapReduce & Distributed FS (Hadoop)
• Java based, open source, many customizable scripts available at (http://lucene.apache.org/nutch/)
• Features:– Customizable– Extensible (e.g. extend to Solr for enhanced
portability)
05/16/2012 Map and Reduce 93
05/16/2012 Map and Reduce 94
Data Structures used by Nutch
• Web Database or WebDB– Mirrors the properties/structure of web graph
being crawled
• Segment– Intermediate index– Contains pages fetched in a single run
• Index– Final inverted index obtained by “merging”
segments (Lucene)
05/16/2012 Map and Reduce 95
WebDB
• Customized graph database• Used by Crawler only• Persistent storage for “pages” & “links”
– Page DB: Indexed by URL and hash; contains content, outlinks, fetch information & score
– Link DB: contains “source to target” links, anchor text
05/16/2012 Map and Reduce 96
Crawling
• Cyclic process– crawler generates a set of fetchlists from the
WebDB– fetchers downloads the content from the Web– the crawler updates the WebDB with new links
that were found– and then the crawler generates a new set of
fetchlists– And Repeat till you reach the “depth”
05/16/2012 Map and Reduce 97
Indexing
• Iterate through all k page sets in parallel, constructing inverted index
• Creates a “searchable document” of:– URL text– Content text– Incoming anchor text
• Other content types might have a different document fields– Eg, email has sender/receiver– Any searchable field end-user will want
• Uses Lucene text indexer
05/16/2012 Map and Reduce 98
Lucene
• Open source search project– http://lucene.apache.org
• Index & search local files– Download lucene-2.2.0.tar.gz from
http://www.apache.org/dyn/closer.cgi/lucene/java/
– Extract files
– Build an index for a directory
• java org.apache.lucene.demo.IndexFiles dir_path
– Try search at command line:
• java org.apache.lucene.demo.SearchFiles
05/16/2012 Map and Reduce 99
LuceneLucene’’s Open Architectures Open Architecture
Spring 2008 99
File System
WWW
IMAPServer
FS Crawler
Larm
PDFHTMLDOCTXT…
TXTparser
PDFparser
HTMLparser
LuceneDocu-ments
StopAnalyzer
CN/DE/Analyzer
StandardAnalyzer
indexer
indexer
Index
sear
cher
sear
cher
Crawling Parsing Indexing
Searching
Lucene
05/16/2012 Map and Reduce 100
Index
Document
Document
Document
Document
Field
Field
Field
Field
Field
Name Value
05/16/2012 Map and Reduce 101
•Create an Analyser•WhitespaceAnalyzer
–divides text at whitespace
•SimpleAnalyzer–divides text at non-letters–convert to lower case
•StopAnalyzer–SimpleAnalyzer– removes stop words
•StandardAnalyzer–good for most European Languages– removes stop words–convert to lower case
05/16/2012 Map and Reduce 102
05/16/2012 Map and Reduce 103
Inverted Index (Inverted File)
Doc 1:
Penn State Football …
football
Doc 2:
Football players … State
Postingid
word doc offset
1 football Doc 1 3
Doc 1 67
Doc 2 1
2 penn Doc 1 1
3 players Doc 2 2
4 state Doc 1 2
Doc 2 13
PostingTable
05/16/2012 Map and Reduce 104
Query
Term Dictionary(Random file access)
Term Info Index(in Memory)
Constant time
Constant time
Frequency File(Random file
access)
Con
stan
t tim
e
Position File(Random file access)
Constant time
Field info(in Memory)
Constant time
05/16/2012 Map and Reduce 105
Map/Reduce Cluster Map/Reduce Cluster ImplementationImplementation
split 0split 1split 2split 3split 4
Output 0
Output 1
Input files
Output files
M map tasks
R reduce tasks
Intermediate files
Several map or reduce tasks can run on a single computer
Each intermediate file is divided into R partitions, by partitioning function
Each reduce task corresponds to one partition
05/16/2012 Map and Reduce 106
05/16/2012 Map and Reduce 107
Cloud Computing Cloud Computing SchedulingScheduling
FIFO, Fair-Sharing Job scheduling with “constraints”
– Dependency– Priority-oriented– Soft Deadline
05/16/2012 Map and Reduce 108
HiveHive Developed at Facebook Used for majority of Facebook jobs “Relational database” built on Hadoop
– Maintains list of table schemas– SQL-like query language (HQL)– Can call Hadoop Streaming scripts from HQL– Supports table partitioning, clustering, complex
data types, some optimizations
05/16/2012 Map and Reduce 109
Creating a Hive TableCreating a Hive Table
CREATE TABLE page_views(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'User IP address') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING)STORED AS SEQUENCEFILE;
• Partitioning breaks table into separate files for each (dt, country) pairEx: /hive/page_view/dt=2008-06-08,country=US /hive/page_view/dt=2008-06-08,country=CA
05/16/2012 Map and Reduce 110
Simple QuerySimple Query
SELECT page_views.* FROM page_views WHERE page_views.date >= '2008-03-01'AND page_views.date <= '2008-03-31'AND page_views.referrer_url like '%xyz.com';
• Hive only reads partition 2008-03-01,* instead of scanning entire table
• Find all page views coming from xyz.com on March 31st:
05/16/2012 Map and Reduce 111
Aggregation and JoinsAggregation and Joins
SELECT pv.page_url, u.gender, COUNT(DISTINCT u.id)FROM page_views pv JOIN user u ON (pv.userid = u.id)GROUP BY pv.page_url, u.genderWHERE pv.date = '2008-03-03';
• Count users who visited each page by gender:
• Sample output:
Programming ModelProgramming Model
Input-key\value pair Output- key\value pair MapReduce Library contains 2 functions:
Map Reduce
Input key\value pair Intermediate key\value pair MapReduce library groups all intermediate values with the
same intermediate key I Intermediate key I Smaller set of values and values for I
MAP
REDUCE
11205/16/2012 Map and Reduce
MapReduce : ExampleMapReduce : Example
Counting number of occurrences of each word in a large collection of documents.
doc name & doc contents word & its
occurrences word & list of counts sum of all counts for word Input and output types:
map(k1,v1) list(k2,v2)
reduce(k2,list(v2)) list(v2)
MAP
REDUCE
11305/16/2012 Map and Reduce
05/16/2012 Map and Reduce 114
GFS: Google File SystemGFS: Google File System
“failures” are norm Multiple-GB files are common Append rather than overwrite
– Random writes are rare Can we relax the consistency?
05/16/2012 Map and Reduce 115
# an input reader# a Map function# a partition function# a compare function# a Reduce function# an output write
05/16/2012 Map and Reduce 116
Grep ExampleGrep Example
Search input files for a given patternMap: emits a line if pattern is matchedReduce: Copies results to output
05/16/2012 Map and Reduce 117
Execution on ClustersExecution on Clusters
1. Input files split (M splits)
2. Assign Master & Workers
3. Map tasks
4. Writing intermediate data to disk (R regions)
5. Intermediate data read & sort
6. Reduce tasks
7. Return
05/16/2012 Map and Reduce 118
split 0
split 1
split 2
split 3
split 4
part0
map
map
map
reduce
reduce part1
inputHDFS
sort/copymerge
outputHDFS
05/16/2012 Map and Reduce 119
05/16/2012 Map and Reduce 120
04/27/2011 DHT 121
HiveHive Developed at Facebook Used for majority of Facebook jobs “Relational database” built on Hadoop
– Maintains list of table schemas– SQL-like query language (HQL)– Can call Hadoop Streaming scripts from HQL– Supports table partitioning, clustering, complex
data types, some optimizations
04/27/2011 DHT 122
Creating a Hive TableCreating a Hive Table
CREATE TABLE page_views(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'User IP address') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING)STORED AS SEQUENCEFILE;
• Partitioning breaks table into separate files for each (dt, country) pairEx: /hive/page_view/dt=2008-06-08,country=US /hive/page_view/dt=2008-06-08,country=CA
04/27/2011 DHT 123
Simple QuerySimple Query
SELECT page_views.* FROM page_views WHERE page_views.date >= '2008-03-01'AND page_views.date <= '2008-03-31'AND page_views.referrer_url like '%xyz.com';
• Hive only reads partition 2008-03-01,* instead of scanning entire table
• Find all page views coming from xyz.com on March 31st:
04/27/2011 DHT 124
Aggregation and JoinsAggregation and Joins
SELECT pv.page_url, u.gender, COUNT(DISTINCT u.id)FROM page_views pv JOIN user u ON (pv.userid = u.id)GROUP BY pv.page_url, u.genderWHERE pv.date = '2008-03-03';
• Count users who visited each page by gender:
• Sample output: