notes on data-intensive processing with hadoop mapreduce

48
Evert Lammerts May 30, 2012 Notes on Data-Intensive Processing with Hadoop MapReduce Image source: http://valley-of-the-shmoon.blogspot.com/2011/04/pushing-elephant-up-stairs.html Guest Lecture Eindhoven University of Technology

Upload: evert-lammerts

Post on 20-Jan-2015

1.655 views

Category:

Technology


0 download

DESCRIPTION

These are slides used in a guest lecture at Eindhoven Technical University, on May 30th, 2012. Adapted from slides by Jimmy Lin.

TRANSCRIPT

Page 1: Notes on data-intensive processing with Hadoop Mapreduce

Evert LammertsMay 30, 2012

Notes on Data-Intensive Processingwith Hadoop MapReduce

Image source: http://valley-of-the-shmoon.blogspot.com/2011/04/pushing-elephant-up-stairs.html

Guest Lecture Eindhoven University of Technology

Page 2: Notes on data-intensive processing with Hadoop Mapreduce

To start with...

● About me

● Note on this lecture

● Adapted from Jimmy Lin's Cloud Computing course...

http://www.umiacs.umd.edu/~jimmylin/cloud-2010-Spring/index.html

● … and from Jimmy's slidedeck from the SIKS Big Data course and his talk at UvA

http://www.umiacs.umd.edu/~jimmylin/

● Today's slides available athttp://www.slideshare.net/evertlammerts

● About you

● Big Data?

● Cloud computing?

● Supercomputing?

● Hadoop and / or MapReduce?

Page 3: Notes on data-intensive processing with Hadoop Mapreduce

The lecture

● Why “Big Data”?

● How “Big Data”?

● MapReduce

● Implementations

Page 4: Notes on data-intensive processing with Hadoop Mapreduce

Why “Big Data”?

The Economist, Feb 25th 2010

Page 5: Notes on data-intensive processing with Hadoop Mapreduce

1. Science

● The emergence of the 4th paradigm

● http://research.microsoft.com/en-us/collaboration/fourthparadigm/

● CERN stores 15 PB LHC data per year, a fraction of the actual produced data

● Square Kilometer Array expectation: 10 PB / hour

Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)

Page 6: Notes on data-intensive processing with Hadoop Mapreduce

2. Engineering

● Count and normalize

http://infrawatch.liacs.nl/

Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)

Page 7: Notes on data-intensive processing with Hadoop Mapreduce

3. Commerce

● Know thy customers

● Data → Insights → Competitive advantages

● Google was processing 20 PB each day... in 2008!

● FaceBook's collected 25 TB of HTTP logs each day... in 2009!

● eBay had ~9 PB of user data, and a growth rate of more than 50 TB / day in 2011

Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)

Page 8: Notes on data-intensive processing with Hadoop Mapreduce

IEEE Intelligent Systems, March/April 2009

Page 9: Notes on data-intensive processing with Hadoop Mapreduce

s/knowledge/data/g

Jimmy Lin, University of Maryland / Twitter, 2011

Page 10: Notes on data-intensive processing with Hadoop Mapreduce

Also see

● P. Russom, Big Data Analytics, The Data Warehousing Institute, 2011

● James G. Kobielus, The Forrester Wave™: Enterprise Hadoop Solutions, Forrester Research, 2012

● James Manyika et al., Big data: The next frontier for innovation, competition, and productivity, McKinsey Global Institute, 2011

● Dirk de Roos et al., Understanding Big Data: Analytics for Enterprise Class Hadoop and Streaming Data, IBM, 2011

Etcetera

Page 11: Notes on data-intensive processing with Hadoop Mapreduce

How “Big Data”?

Page 12: Notes on data-intensive processing with Hadoop Mapreduce
Page 13: Notes on data-intensive processing with Hadoop Mapreduce

“Work”

w1 w2 w3

r1 r2 r3

“Result”

“worker” “worker” “worker”

Partition

Combine

Divide and Conquer

Jimmy Lin, University of Maryland / Twitter, 2011

Page 14: Notes on data-intensive processing with Hadoop Mapreduce

Amdahl's Law

Page 15: Notes on data-intensive processing with Hadoop Mapreduce

Challenges in Parallel systems

● How do we divide the work into separate tasks?

● How do we get these tasks to our workers?

● What if we have more tasks than workers?

● What if our tasks need to exchange information?

● What if workers crash? (That's no exception!)

● How do we aggregate results?

Page 16: Notes on data-intensive processing with Hadoop Mapreduce

Managing Parallel Applications

● A synchronization mechanism is needed

● To coordinate communication (like exchanging state) between workers

● To manage access to shared resources like data

● What if you don't?

● Mutual Exclusion

● Resource Starvation

● Race Conditions

● Dining philosophers, sleeping barber, cigarette smokers, readers-writers, producers-consumers, etcetera

Managing parallelism is hard!

Page 17: Notes on data-intensive processing with Hadoop Mapreduce

Source: Ricardo Guimarães Herrmann

Page 18: Notes on data-intensive processing with Hadoop Mapreduce

Shared Memory

P1 P2 P3 P4 P5

Me

mo

ry

master

slaves

producer consumer

producer consumer

work queue

Message Passing

P1 P2 P3 P4 P5

Well known tools and patterns

● Programming models

● Shared memory (pthreads)

● Message passing (MPI)

● Design patterns

● Master-slave

● Producer-consumer

● Shared queues

Jimmy Lin, University of Maryland / Twitter, 2011

Page 19: Notes on data-intensive processing with Hadoop Mapreduce

http://www.lrr.in.tum.de/~jasmin/neumann.html

From Von Neumann...

Page 20: Notes on data-intensive processing with Hadoop Mapreduce

… to a datacenter

Page 21: Notes on data-intensive processing with Hadoop Mapreduce
Page 22: Notes on data-intensive processing with Hadoop Mapreduce

Where to go from here

● The search for the right level of abstraction

● How do we build an architecture for a scaled environment?

● From HAL to DCAL

● Hiding parallel application management from the developer

● It's hard!

● Separating the what from the how

● The developer specifies the computation

● The runtime environment handles the execution

Barosso, 2009

Page 23: Notes on data-intensive processing with Hadoop Mapreduce

Ideas on scaling

● Scale “out”, don't scale “up”

● Hard upper-bound on the capacity of a single machine

● No upper-bound on the amount of machines you can buy (in theory)

● When dealing with large data...

● Prefer sequential reads over random reads

& rather not store a trillion small files, but a million big ones

– Disk access is slow, but throughput is reasonable!

● Try to understand when a NAS / SAN architecture is really necessary

– It's expensive to scale!

Page 24: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce

Page 25: Notes on data-intensive processing with Hadoop Mapreduce

An abstraction of typical large-data problems

(1) Iterate over a large number of records

(2) Extract something of interest from each

(3) Shuffle and sort intermediate results

(4) Aggregate intermediate results

(5) Generate final output

Page 26: Notes on data-intensive processing with Hadoop Mapreduce

An abstraction of typical large-data problems

(1) Iterate over a large number of records

(2) Extract something of interest from each

(3) Shuffle and sort intermediate results

(4) Aggregate intermediate results

(5) Generate final output

MAP

REDUCE

MapReduce provides a functional abstraction of step 2 and step 4

Page 27: Notes on data-intensive processing with Hadoop Mapreduce

Roots in functional programming

Map(S: array, f())

● Apply f(s S) ∈ for all items in S

Fold(S: array, f())

● Recursively apply f() to each item in S and the result of the previous operation, or nil if such an operation does not exist

Source: Wikipedia

Page 28: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce

The programmer specifies two functions:

● map(k, v) → <k', v'>*

● reduce(k', v'[ ]) → <k', v'>*

All values associated with the same key are sent to the same reducer

The execution framework handles everything else

Page 29: Notes on data-intensive processing with Hadoop Mapreduce

mapmap map map

Shuffle and Sort: aggregate values by keys

reduce reduce reduce

k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6

ba 1 2 c c3 6 a c5 2 b c7 8

a 1 5 b 2 7 c 2 3 6 8

r1 s1 r2 s2 r3 s3

Jimmy Lin, University of Maryland / Twitter, 2011

Page 30: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce “Hello World”: WordCount

● Question: how can we count unique words in a given text?

● Line-based input (a record is one line)

● Key: position of first character in the whole document

● Value: a line not including the EOL character

● Input looks like:

● Output looks like:

Key: 0, value: “a wise old owl lived in an oak”Key: 31, value: “the more he saw the less he spoke”Key: 63, value: “the less he spoke the more he heard”Key: 99, value: “why can't we all be like that wise old bird”

(a,1) (an,1) (be,1)(he,4) (in,1) (we,1)(all,1) (oak,1) (old,2)(owl,1) (saw,1) (the,4)(why,1) (bird,1) (less,2)(like,1) (more,2) (that,1)(wise,2) (can't,1) (heard,1)(lived,1) (spoke,2)

Page 31: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce “Hello World”: WordCount

Page 32: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce

The programmer specifies two functions:

● map(k, v) → <k', v'>*

● reduce(k', v'[ ]) → <k', v'>*

All values associated with the same key are sent to the same reducer

The “execution framework” handles ? everything else ?

Page 33: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce execution framework

● Handles scheduling

● Assigns map and reduce tasks to workers

● Handles “data-awareness”: moves processes to data

● Handles synchronization

● Gathers, sorts, and shuffles intermediate data

● Handles errors and faults

● Detects worker failures and restarts

● Handles communication with the distributed filesystem

Page 34: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce

The programmer specifies two functions:

● map (k, v) → <k', v'>*

● reduce (k', v'[ ]) → <k', v'>*

All values associated with the same key are sent to the same reducer

The execution framework handles everything else...

Not quite... usually, programmers also specify:

● partition (k', number of partitions) → partition for k'

● Often a simple hash of the key, e.g., hash(k') mod n

● Divides up key space for parallel reduce operations

● combine (k', v') → <k', v'>*

● Mini-reducers that run in memory after the map phase

● Used as optimization to reduce network traffic

Page 35: Notes on data-intensive processing with Hadoop Mapreduce

combinecombine combine combine

ba 1 2 c 9 a c5 2 b c7 8

partition partition partition partition

mapmap map map

k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6

ba 1 2 c c3 6 a c5 2 b c7 8

Shuffle and Sort: aggregate values by keys

reduce reduce reduce

a 1 5 b 2 7 c 2 9 8

r1 s1 r2 s2 r3 s3

c 2 3 6

Jimmy Lin, University of Maryland / Twitter, 2011

Page 36: Notes on data-intensive processing with Hadoop Mapreduce

Quick note...

The term “MapReduce” can refer to:

● The programming model

● The “execution framework”

● The specific implementation

Page 37: Notes on data-intensive processing with Hadoop Mapreduce

Implementation(s)

Page 38: Notes on data-intensive processing with Hadoop Mapreduce

MapReduce implementations

● Google (C++)

● Dean & Ghemawat, MapReduce: simplified data processing on large clusters, 2004

● Ghemawat, Gobioff, Leung, The Google File System, 2003

● Apache Hadoop (Java)

● Open source implementation

● Originally led by Yahoo!

● Broadly adopted

● Custom research implementations

● For GPU's, supercomputers, etcetera

Page 39: Notes on data-intensive processing with Hadoop Mapreduce

split 0

split 1

split 2

split 3

split 4

worker

worker

worker

worker

worker

Master

UserProgram

outputfile 0

outputfile 1

(1) submit

(2) schedule map (2) schedule reduce

(3) read(4) local write

(5) remote read(6) write

Inputfiles

Mapphase

Intermediate files(on local disk)

Reducephase

Outputfiles

Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)

Page 40: Notes on data-intensive processing with Hadoop Mapreduce

split 0

split 1

split 2

split 3

split 4

worker

worker

worker

worker

worker

Master

UserProgram

outputfile 0

outputfile 1

(1) submit

(2) schedule map (2) schedule reduce

(3) read(4) local write

(5) remote read(6) write

Inputfiles

Mapphase

Intermediate files(on local disk)

Reducephase

Outputfiles

Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)

Page 41: Notes on data-intensive processing with Hadoop Mapreduce

split 0

split 1

split 2

split 3

split 4

worker

worker

worker

worker

worker

Master

UserProgram

outputfile 0

outputfile 1

(1) submit

(2) schedule map (2) schedule reduce

(3) read(4) local write

(5) remote read(6) write

Inputfiles

Mapphase

Intermediate files(on local disk)

Reducephase

Outputfiles

Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)

How do we get our input data to the map()'s on the workers?

Page 42: Notes on data-intensive processing with Hadoop Mapreduce

Distributed File System

● Don't move data to the workers... move workers to the data!

● Store data on the local disks of nodes in the cluster

● Start up the work on the node that has the data local

● A distributed files system is the answer

● GFS (Google File System) for Google's MapReduce

● HDFS (Hadoop Distributed File System) for Hadoop

Page 43: Notes on data-intensive processing with Hadoop Mapreduce

GFS: Design decisions

● Files stored as chunks

● Fixed size (64MB)

● Reliability through replication

● Each chunk replicated across 3+ chunkservers

● Single master to coordinate access, keep metadata

● Simple centralized management

● No data caching

● Little benefit due to large datasets, streaming reads

● Simplify the API

● Push some of the issues onto the client (e.g., data layout)

Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)

HDFS = GFS clone (same basic ideas)

Page 44: Notes on data-intensive processing with Hadoop Mapreduce

From GFS to HDFS

● Terminology differences:

● GFS Master = Hadoop NameNode

● GFS Chunkservers = Hadoop DataNode

● Chunk = Block

● Functional differences

● File appends in HDFS is relatively new

● HDFS performance is (likely) slower

● Blocksize is configurable by the client

We use Hadoop terminology

Page 45: Notes on data-intensive processing with Hadoop Mapreduce

HDFS Architecture

(file name, block id)

(block id, block location)

instructions to datanode

datanode state(block id, byte range)

block data

HDFS namenode

HDFS datanode

Linux file system

HDFS datanode

Linux file system

File namespace/foo/bar

block 3df2

Application

HDFS Client

Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)

Page 46: Notes on data-intensive processing with Hadoop Mapreduce

Namenode Responsibilities

● Managing the file system namespace:

● Holds file/directory structure, metadata, file-to-block mapping, access permissions, etcetera

● Coordinating file operations

● Directs clients to DataNodes for reads and writes

● No data is moved through the NameNode

● Maintaining overall health:

● Periodic communication with the DataNodes

● Block re-replication and rebalancing

● Garbage collection

Page 47: Notes on data-intensive processing with Hadoop Mapreduce

Putting everything together

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

namenode

namenode daemon

job submission node

jobtracker

Jimmy Lin, University of Maryland / Twitter, 2011

Page 48: Notes on data-intensive processing with Hadoop Mapreduce

Questions?