notes on data-intensive processing with hadoop mapreduce
DESCRIPTION
These are slides used in a guest lecture at Eindhoven Technical University, on May 30th, 2012. Adapted from slides by Jimmy Lin.TRANSCRIPT
Evert LammertsMay 30, 2012
Notes on Data-Intensive Processingwith Hadoop MapReduce
Image source: http://valley-of-the-shmoon.blogspot.com/2011/04/pushing-elephant-up-stairs.html
Guest Lecture Eindhoven University of Technology
To start with...
● About me
● Note on this lecture
● Adapted from Jimmy Lin's Cloud Computing course...
http://www.umiacs.umd.edu/~jimmylin/cloud-2010-Spring/index.html
● … and from Jimmy's slidedeck from the SIKS Big Data course and his talk at UvA
http://www.umiacs.umd.edu/~jimmylin/
● Today's slides available athttp://www.slideshare.net/evertlammerts
● About you
● Big Data?
● Cloud computing?
● Supercomputing?
● Hadoop and / or MapReduce?
The lecture
● Why “Big Data”?
● How “Big Data”?
● MapReduce
● Implementations
Why “Big Data”?
The Economist, Feb 25th 2010
1. Science
● The emergence of the 4th paradigm
● http://research.microsoft.com/en-us/collaboration/fourthparadigm/
● CERN stores 15 PB LHC data per year, a fraction of the actual produced data
● Square Kilometer Array expectation: 10 PB / hour
Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
2. Engineering
● Count and normalize
http://infrawatch.liacs.nl/
Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
3. Commerce
● Know thy customers
● Data → Insights → Competitive advantages
● Google was processing 20 PB each day... in 2008!
● FaceBook's collected 25 TB of HTTP logs each day... in 2009!
● eBay had ~9 PB of user data, and a growth rate of more than 50 TB / day in 2011
Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
IEEE Intelligent Systems, March/April 2009
s/knowledge/data/g
Jimmy Lin, University of Maryland / Twitter, 2011
Also see
● P. Russom, Big Data Analytics, The Data Warehousing Institute, 2011
● James G. Kobielus, The Forrester Wave™: Enterprise Hadoop Solutions, Forrester Research, 2012
● James Manyika et al., Big data: The next frontier for innovation, competition, and productivity, McKinsey Global Institute, 2011
● Dirk de Roos et al., Understanding Big Data: Analytics for Enterprise Class Hadoop and Streaming Data, IBM, 2011
Etcetera
How “Big Data”?
“Work”
w1 w2 w3
r1 r2 r3
“Result”
“worker” “worker” “worker”
Partition
Combine
Divide and Conquer
Jimmy Lin, University of Maryland / Twitter, 2011
Amdahl's Law
Challenges in Parallel systems
● How do we divide the work into separate tasks?
● How do we get these tasks to our workers?
● What if we have more tasks than workers?
● What if our tasks need to exchange information?
● What if workers crash? (That's no exception!)
● How do we aggregate results?
Managing Parallel Applications
● A synchronization mechanism is needed
● To coordinate communication (like exchanging state) between workers
● To manage access to shared resources like data
● What if you don't?
● Mutual Exclusion
● Resource Starvation
● Race Conditions
● Dining philosophers, sleeping barber, cigarette smokers, readers-writers, producers-consumers, etcetera
Managing parallelism is hard!
Source: Ricardo Guimarães Herrmann
Shared Memory
P1 P2 P3 P4 P5
Me
mo
ry
master
slaves
producer consumer
producer consumer
work queue
Message Passing
P1 P2 P3 P4 P5
Well known tools and patterns
● Programming models
● Shared memory (pthreads)
● Message passing (MPI)
● Design patterns
● Master-slave
● Producer-consumer
● Shared queues
Jimmy Lin, University of Maryland / Twitter, 2011
http://www.lrr.in.tum.de/~jasmin/neumann.html
From Von Neumann...
… to a datacenter
Where to go from here
● The search for the right level of abstraction
● How do we build an architecture for a scaled environment?
● From HAL to DCAL
● Hiding parallel application management from the developer
● It's hard!
● Separating the what from the how
● The developer specifies the computation
● The runtime environment handles the execution
Barosso, 2009
Ideas on scaling
● Scale “out”, don't scale “up”
● Hard upper-bound on the capacity of a single machine
● No upper-bound on the amount of machines you can buy (in theory)
● When dealing with large data...
● Prefer sequential reads over random reads
& rather not store a trillion small files, but a million big ones
– Disk access is slow, but throughput is reasonable!
● Try to understand when a NAS / SAN architecture is really necessary
– It's expensive to scale!
MapReduce
An abstraction of typical large-data problems
(1) Iterate over a large number of records
(2) Extract something of interest from each
(3) Shuffle and sort intermediate results
(4) Aggregate intermediate results
(5) Generate final output
An abstraction of typical large-data problems
(1) Iterate over a large number of records
(2) Extract something of interest from each
(3) Shuffle and sort intermediate results
(4) Aggregate intermediate results
(5) Generate final output
MAP
REDUCE
MapReduce provides a functional abstraction of step 2 and step 4
Roots in functional programming
Map(S: array, f())
● Apply f(s S) ∈ for all items in S
Fold(S: array, f())
● Recursively apply f() to each item in S and the result of the previous operation, or nil if such an operation does not exist
Source: Wikipedia
MapReduce
The programmer specifies two functions:
● map(k, v) → <k', v'>*
● reduce(k', v'[ ]) → <k', v'>*
All values associated with the same key are sent to the same reducer
The execution framework handles everything else
mapmap map map
Shuffle and Sort: aggregate values by keys
reduce reduce reduce
k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6
ba 1 2 c c3 6 a c5 2 b c7 8
a 1 5 b 2 7 c 2 3 6 8
r1 s1 r2 s2 r3 s3
Jimmy Lin, University of Maryland / Twitter, 2011
MapReduce “Hello World”: WordCount
● Question: how can we count unique words in a given text?
● Line-based input (a record is one line)
● Key: position of first character in the whole document
● Value: a line not including the EOL character
● Input looks like:
● Output looks like:
Key: 0, value: “a wise old owl lived in an oak”Key: 31, value: “the more he saw the less he spoke”Key: 63, value: “the less he spoke the more he heard”Key: 99, value: “why can't we all be like that wise old bird”
(a,1) (an,1) (be,1)(he,4) (in,1) (we,1)(all,1) (oak,1) (old,2)(owl,1) (saw,1) (the,4)(why,1) (bird,1) (less,2)(like,1) (more,2) (that,1)(wise,2) (can't,1) (heard,1)(lived,1) (spoke,2)
MapReduce “Hello World”: WordCount
MapReduce
The programmer specifies two functions:
● map(k, v) → <k', v'>*
● reduce(k', v'[ ]) → <k', v'>*
All values associated with the same key are sent to the same reducer
The “execution framework” handles ? everything else ?
MapReduce execution framework
● Handles scheduling
● Assigns map and reduce tasks to workers
● Handles “data-awareness”: moves processes to data
● Handles synchronization
● Gathers, sorts, and shuffles intermediate data
● Handles errors and faults
● Detects worker failures and restarts
● Handles communication with the distributed filesystem
MapReduce
The programmer specifies two functions:
● map (k, v) → <k', v'>*
● reduce (k', v'[ ]) → <k', v'>*
All values associated with the same key are sent to the same reducer
The execution framework handles everything else...
Not quite... usually, programmers also specify:
● partition (k', number of partitions) → partition for k'
● Often a simple hash of the key, e.g., hash(k') mod n
● Divides up key space for parallel reduce operations
● combine (k', v') → <k', v'>*
● Mini-reducers that run in memory after the map phase
● Used as optimization to reduce network traffic
combinecombine combine combine
ba 1 2 c 9 a c5 2 b c7 8
partition partition partition partition
mapmap map map
k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6
ba 1 2 c c3 6 a c5 2 b c7 8
Shuffle and Sort: aggregate values by keys
reduce reduce reduce
a 1 5 b 2 7 c 2 9 8
r1 s1 r2 s2 r3 s3
c 2 3 6
Jimmy Lin, University of Maryland / Twitter, 2011
Quick note...
The term “MapReduce” can refer to:
● The programming model
● The “execution framework”
● The specific implementation
Implementation(s)
MapReduce implementations
● Google (C++)
● Dean & Ghemawat, MapReduce: simplified data processing on large clusters, 2004
● Ghemawat, Gobioff, Leung, The Google File System, 2003
● Apache Hadoop (Java)
● Open source implementation
● Originally led by Yahoo!
● Broadly adopted
● Custom research implementations
● For GPU's, supercomputers, etcetera
split 0
split 1
split 2
split 3
split 4
worker
worker
worker
worker
worker
Master
UserProgram
outputfile 0
outputfile 1
(1) submit
(2) schedule map (2) schedule reduce
(3) read(4) local write
(5) remote read(6) write
Inputfiles
Mapphase
Intermediate files(on local disk)
Reducephase
Outputfiles
Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
split 0
split 1
split 2
split 3
split 4
worker
worker
worker
worker
worker
Master
UserProgram
outputfile 0
outputfile 1
(1) submit
(2) schedule map (2) schedule reduce
(3) read(4) local write
(5) remote read(6) write
Inputfiles
Mapphase
Intermediate files(on local disk)
Reducephase
Outputfiles
Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
split 0
split 1
split 2
split 3
split 4
worker
worker
worker
worker
worker
Master
UserProgram
outputfile 0
outputfile 1
(1) submit
(2) schedule map (2) schedule reduce
(3) read(4) local write
(5) remote read(6) write
Inputfiles
Mapphase
Intermediate files(on local disk)
Reducephase
Outputfiles
Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
How do we get our input data to the map()'s on the workers?
Distributed File System
● Don't move data to the workers... move workers to the data!
● Store data on the local disks of nodes in the cluster
● Start up the work on the node that has the data local
● A distributed files system is the answer
● GFS (Google File System) for Google's MapReduce
● HDFS (Hadoop Distributed File System) for Hadoop
GFS: Design decisions
● Files stored as chunks
● Fixed size (64MB)
● Reliability through replication
● Each chunk replicated across 3+ chunkservers
● Single master to coordinate access, keep metadata
● Simple centralized management
● No data caching
● Little benefit due to large datasets, streaming reads
● Simplify the API
● Push some of the issues onto the client (e.g., data layout)
Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
HDFS = GFS clone (same basic ideas)
From GFS to HDFS
● Terminology differences:
● GFS Master = Hadoop NameNode
● GFS Chunkservers = Hadoop DataNode
● Chunk = Block
● Functional differences
● File appends in HDFS is relatively new
● HDFS performance is (likely) slower
● Blocksize is configurable by the client
We use Hadoop terminology
HDFS Architecture
(file name, block id)
(block id, block location)
instructions to datanode
datanode state(block id, byte range)
block data
HDFS namenode
HDFS datanode
Linux file system
…
HDFS datanode
Linux file system
…
File namespace/foo/bar
block 3df2
Application
HDFS Client
Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
Namenode Responsibilities
● Managing the file system namespace:
● Holds file/directory structure, metadata, file-to-block mapping, access permissions, etcetera
● Coordinating file operations
● Directs clients to DataNodes for reads and writes
● No data is moved through the NameNode
● Maintaining overall health:
● Periodic communication with the DataNodes
● Block re-replication and rebalancing
● Garbage collection
Putting everything together
datanode daemon
Linux file system
…
tasktracker
slave node
datanode daemon
Linux file system
…
tasktracker
slave node
datanode daemon
Linux file system
…
tasktracker
slave node
namenode
namenode daemon
job submission node
jobtracker
Jimmy Lin, University of Maryland / Twitter, 2011
Questions?