data intensive computing: mapreduce and hadoop distributed file system mukaddim pathan research...

98
Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Upload: megan-gabriella-chase

Post on 12-Jan-2016

219 views

Category:

Documents


2 download

TRANSCRIPT

Page 1: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Data Intensive Computing: MapReduce and Hadoop Distributed File System

Mukaddim Pathan

Research Scientist, CSIRO ICT Centre

Page 2: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Lecture 1

• Data Intensive Computing

• MapReduce Basics (Google)

• Hadoop Distributed File System (HDFS)

• Apache Hadoop Working Details

Page 3: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

The Data Age!

2006

2000

2011

0.18 zettabytes

1. 8 zettabytes

?

** 1 zettabytes = 1 billion terabytes

• Multi-dimensional sources and types of data

• Stock exchange, Facebook, Flickr, Picasa, Internet archive, Large Hadron Collider, sensor networks, biological applications

• Scientific applications and personal digital content creation

• Increased storage capacities and access speed over years

• Still problems exist with I/O read, hardware failure, and combining data from multiple sources

• Data storage and analysis• A scalable information processing

framework is required to handle vast amounts of data

Page 4: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Data Management and Processing

• Data intensive computing• Concerns with the production, manipulation and analysis of data in the

range of hundreds of megabytes (MB) to petabytes (PB) and beyond

• A range of supporting parallel and distributed computing technologies to deal with the challenges of data representation, reliable shared storage, efficient algorithms and scalable infrastructure to perform analysis

Page 5: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Challenges Ahead

• Challenges with data intensive computing• Scalable algorithms that can search and process massive datasets

• New metadata management technologies that can scale to handle complex, heterogeneous and distributed data sources

• Support for accessing in-memory multi-terabyte data structures

• High performance, highly reliable petascale distributed file system

• Techniques for data reduction and rapid processing

• Software mobility to move computation where data is located

• Hybrid interconnect with support for multi-gigabyte data streams

• Flexible and high performance software integration technique

• Hadoop in rescue!• A family of related project, best known for MapReduce and Hadoop

Distributed File System (HDFS)

Page 6: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Why Hadoop

• Drivers• 500M+ unique users per month• Billions of interesting events per day• Data analysis is key

• Need massive scalability• PB’s of storage, millions of files, 1000’s of nodes

• Need to do this cost effectively• Use commodity hardware• Share resources among multiple projects• Provide scale when needed

• Need reliable infrastructure• Must be able to deal with failures – hardware, software, networking• Failure is expected rather than exceptional• Transparent to applications• very expensive to build reliability into each application

• The Hadoop infrastructure provides these capabilities

Page 7: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Introduction to Hadoop

• Apache Hadoop• Open Source – Apache Foundation project

• Yahoo! is Apache Platinum Sponsor

• History• Started in 2005 by Doug Cutting

• Yahoo! became the primary contributor in 2006

• Yahoo! scaled it from 20 node clusters to 4000 node clusters today

• Deployed large scale science clusters in 2007

• Began running major production jobs in Q1 2008

• Portable• Written in Java

• Runs on commodity hardware

• Linux, Mac OS/X, Windows, and Solaris

Page 8: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce Basics

• A programming model and its associated implementation for parallel processing of large data sets

• It was developed within Google as a mechanism for processing large amounts of raw data, e.g. crawled documents or web request logs.

• Capable of efficiently distribute processing of TB’s of data on 1000’s of processing nodes

• This distribution implies parallel computing since the same computations are performed on each CPU, but with a different dataset (or different segment of a large dataset)

• Implementation’s run-time system library takes care of parallelism, fault tolerance, data distribution, load balancing etc

• Complementary to RDBMS, but differs in many ways (data size, access, update, structure, integrity and scale)

• Features: fault tolerance, locality, task granularity, backup tasks, skipping bad records and so on

Page 9: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Operational Details

• Map• Map function, written by a user of the MapReduce library, takes an

input pair and produces a set of intermediate key/value pairs

• The MapReduce library groups together all intermediate values associated with the same intermediate key k and passes them to the reduce function

• Reduce• The reduce function, also written by the user, accepts an

intermediate key k and a set of values for that key. It merges together these values to form a possibly smaller set of values

Page 10: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Example

• Consider the problem of counting the number of occurrences of each word in a large document

Page 11: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

See the Example Graphically

• Counting word frequency in a document

Map

doc

Reduce

<word,3>

<word,1>

<word,1>

<word,1>

MapReduce RuntimeSystem

<word,1,1,1>

Page 12: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce Overview

• The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits or shards.

• The input shards can be processed in parallel on different machines (nodes in a Cluster)

• Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R)

• The number of partitions (R) and the partitioning function are specified by the user

Page 13: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce: Execution Overview

(1) Create Splits of input files and start set of tasks running copies of

program

(2) One task is the Master, the others are workers. Master assigns Map and Reduce

tasks to idle workers

(3) Worker assigned a Map task reads data from the task’s corresponding split, parsing key/value pairs from input and passing them to Map()

(4) Periodic local writes, partitioned into R regions by the partitioning function. Location of buffered pairs passed back to Master

(5) Reduce worker is notified of locations of intermediate key/value pairs by Master, and uses RPC to get the data from the

corresponding Map worker. Upon getting all data, perform sort on intermediate keys

(6) Reduce worker iterates over sorted intermediate data, passing data grouped by key to Reduce(). Output of Reduce() is

appended to worker’s output file

(7) Upon completion of all Reduce tasks, Master returns control to user program

Page 14: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

More About the Master

• Master owns several data structure to keep track of execution

• Stores state (idle, in-progress, completed) for each task, and identity of worker machines for in-progress and completed tasks

• Master keeps track of location of intermediate file regions, serving as conduit from Map to corresponding Reduce tasks

• Master stores the locations and sizes of the R intermediate file regions produced by the map task

• Updates are received from Map tasks as they are completed by assigned Map workers; corresponding information pushed incrementally to workers with in-progress Reduce tasks

Page 15: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop MapReduce

mapreduce fm fr l =map (reducePerKey fr) (group (map fm l))

reducePerKey fr (k,v_list) =

(k, (foldl (fr k) [] v_list)

Hadoop• The fm and fr are function objects (classes)• Class for fm implements the Mapper interface

Map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)

• Class for fr implements the Reducer interface

reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)

Hadoop takes the generated class files and manages running them

Page 16: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Handling Failure

• Worker failure• To detect failure, the master pings every worker periodically• If no response is received from a worker in a certain amount of

time, the master marks the worker as failed• Any map tasks completed by the worker are reset back to their

initial idle state, and therefore become eligible for scheduling on other workers

• Completed map tasks are re-executed as their output on is stored on the local disk(s) of the failed machine and is therefore inaccessible

• Completed reduce tasks do not need to be re-executed since their output is stored in a global file system

• Master failure• Periodic checkpoints are written to handle master failure

• If the master task dies, a new copy can be started from the last checkpoint state

Page 17: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Data Locality

• Network bandwidth is a valuable scarce resource and it should be consumed wisely

• The distributed file system replicates data across different nodes

• The Master takes these locations into account when scheduling Map tasks, trying to place them with the data

• Otherwise, Map tasks are scheduled to reside “near” a replica of the data (e.g., on a worker machine that is on the same network switch)

• When running large MapReduce operations, most input data is read locally and consume no network bandwidth

• Data locality worked well with a Hadoop-specific distributed file system

• Integration of a Cloud-based file system incurs extra cost and loss data locality

Page 18: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Task Granularity

• Finely granular tasks: many more map tasks than machines• Better dynamic load balancing• Minimizes time for fault recovery• Can pipeline the shuffling/grouping while maps are still running

• Typically 200k Map tasks, 5k Reduce tasks for 2k hosts

• For M map tasks and R reduce tasks there are O(M+R) scheduling decisions and O(M*R) states

Page 19: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Load Balancing

• Built-in dynamic load balancing

• One other problem that can slow calculations is the existence of stragglers; machines suffering from either hardware defects, contention for resources with other applications etc.

• When an overall MapReduce operation passes some point deemed to be “nearly complete,” the Master schedules backup tasks for all of the currently in-progress tasks

• When a particular task is completed, whether it be “original” or back-up, its value is used

• This strategy costs little more overall, but can result in big performance gains

Page 20: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Refinements

• Partitioning function• MapReduce users specify the number of reduce tasks/output files (R)

• Data gets partitioned across these tasks using a partitioning function on the intermediate key

• Default is “hash(key) mod R”, resulting in well balanced partitions

• Special partitioning function can also be used, such as “hash(Hostname(urlkey))” to combine all URLs (output keys) from the same host to the same output file

• Ordering guarantees• Within a given partition, the intermediate key/value pairs are processed

in increasing key order

• This ordering guarantee makes it easy to generate a sorted output file per partition

• Allows users to have sorted output and efficient access lookups by key

Page 21: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Refinements (Cont’d)

• Combiner function• There can be significant repetition in the intermediate keys produced

by each map task and the reduce task is associative

• While one reduce task can perform the aggregation, an on-processor combiner function can be used to perform partial merging of Map output locally before sending over the network

• The combiner function is executed on each machine that performs a map task

• The program logic for the combiner function and reduce tasks are potentially same, except how the output is handled, i.e. writing output in an intermediate file or in the final output file

• Input/Output types• Multiple input/output format supported

• User can also add support to new input/output type by providing an implementation to the reader/writer interface

Page 22: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Refinements (Cont’d)

• Skipping bad records• MapReduce provides a mode for skipping records that are diagnosed

to cause Map() crashes

• Each worker process installs a signal handler that catches segment violations and bus errors, tracked by master

• When the master notices more than one failure on a particular record, it indicates that the record should be skipped during re-execution

• Local execution/debugging• Not straightforward due to the distributed computation of MapReduce

• Alternative implementation of the MapReduce library that sequentially on one node (local machine)

• Users can use any debugging or testing tools they find useful

Page 23: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Refinements (Cont’d)

• Status information• Master contains internal http server to produce status pages with

information on how many tasks have been completed, how many are in progress, bytes of input, bytes of intermediate data, bytes of output, and processing rates.

• The status page contains links to the standard error and standard output files generated by each task

• A user can monitor progress, predict computation time and accelerate it by adding more hosts Counters

• Counters• A facility to count occurrences of various events

• To use this facility, user code creates a named counter object and then increments the counter appropriately in Map and/or Reduce function

Page 24: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Performance Evaluation

• Tests run on cluster of 1800 machines: • 4 GB of memory

• Dual-processor 2 GHz Xeons with Hyperthreading

• Dual 160 GB IDE disks

• Gigabit Ethernet per machine

• Bisection bandwidth approximately 100 Gbps

• Two benchmarks• MR_Grep Scan 1010 100-byte records to extract records matching

a rare pattern (the pattern occurs in 92K records)

• MR_Sort Sort 1010 100-byte records (modeled after TeraSort benchmark)

• Input is split into approximately 64 MB pieces (M = 15000) and the entire output is placed in one file (R = 1)

Page 25: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Data Transfer

• Locality optimization helps• 1800 machines read 1 TB of data at peak of ~31 GB/s

• Without this, rack switches would limit to 10 GB/s

• Start-up overhead is significant for short jobs

Page 26: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Results

• Backup tasks reduce job completion time significantly • System deals well with failures

Normal No backup tasks 200 processes killed

Page 27: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce Applications

• Applications• Text tokenization (alert system), indexing, and search

• Data mining, statistical modeling, and machine learning

• Healthcare – parse, clean and reconcile extremely large amount of data

• Biosciences – drug discovery, meta-genomics, bioassay activities

• Cost-effective mash-ups – retrieving and analyzing biomedical knowledge

• Computational biology – parallelize bioinformatics algorithms for SNP discovery, genotyping and personal genomics, e.g. CloudBurst

• Emergency response – real-time monitoring/forecasting for operational decision support

• and so on (Check: http://wiki.apache.org/hadoop/PoweredBy)

• MapReduce inapplicability• Database management – does not provide traditional DBMS features

• Database implementation – lack of schema, low data integrity

• Normalization poses problems for MapReduce, due to non-local reading

• Applications cannot have read and write many times feature

Page 28: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop Distributed File System (HDFS)

• A distributed file system designed to run on commodity hardware

• HDFS was originally built as infrastructure for the Apache Nutch web search engine project, with the aim to achieve fault tolerance, ability to run on low-cost hardware and handle large datasets

• It is now an Apache Hadoop subproject

• Share similarities with existing distributed file systems and supports traditional hierarchical file organization

• Reliable data replication and accessible via Web interface and Shell commands

• Benefits: Fault tolerant, high throughput, streaming data access, robustness and handling of large data sets

Page 29: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Assumptions and Goals

• Hardware failures• Detection of faults, quick and automatic recovery

• Streaming data access• Designed for batch processing rather than interactive use by users

• Large data sets• Applications that run on HDFS have large data sets, typically in gigabytes

to terabytes in size

• Simple coherency model• Applications need a write-once, read-many times access model for files

• Computation migration• Computation is moved closer to where data is located

• Portability• Easily portable between heterogeneous hardware and software platforms

Page 30: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Concepts

• Blocks• Disk block is a minimum amount of data that it can read or write

• HDFS block is a unit of 64 MB

• Files in HDFS are broken into block-sized chunks and stored as independent units

• A file smaller than a single block does not occupy the full block’s worth of underlying storage

• Benefits: filesystem abstraction, multiple disk usage for large-size file, fit well for replication

• Copy a file from local filesystem to HDFS

% hadoop fs –copyFromLocal …/a.txt hdfs://localhost/…/a.txt

• Listing the blocks that make up each file in the filesystem

% hadoop fsck / -files -blocks

Page 31: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Concepts (Cont’d)

• NameNodes and DataNodes

• HDFS follows a master/slave architecture

• HDFS cluster consists of a single NameNode and a number of DataNodes, one for each cluster node

• NameNode is a master server that manages the file system namespace and regulates access to files by clients

• DataNodes manage storage attached to the cluster nodes they run on

• HDFS exposes a file system namespace and allows user data to be stored in files

• Internally a file in split into one or more blocks for storing in DataNodes

Page 32: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Concepts (Cont’d) - NameNode Metadata

• NameNode executes file system namespace operations such as opening, closing, and renaming files and directories, and also determines the mapping of blocks to DataNdoes

• Meta-data in Memory• The entire metadata is in main memory• No demand paging of meta-data

• Types of Metadata• List of files• List of Blocks for each file• List of DataNodes for each block• File attributes, e.g creation time, replication factor

• A Transaction Log• Records file creations, file deletions. etc

Page 33: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Concepts (Cont’d) - DataNode

• A Block Server

• Stores data in the local file system (e.g. ext3)

• Stores meta-data of a block (e.g. CRC)

• Serves data and meta-data to Clients

• DataNode is responsible for serving read and write requests from the file system’s clients, as well as perform block creation, deletion and replication upon instruction from the NameNode

• Block Report

• Periodically sends a report of all existing blocks to the NameNode

• Facilitates Pipelining of Data

• Forwards data to other specified DataNodes

Page 34: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Concepts (Cont’d)

• File system namespace• A user or an application can create directories and store files inside

these directories

• Any change to the filesystem namespace or its properties is recorded by the NameNode

• A application can specify the number of replicas of a file that should be maintained by HDFS

• Data replication• Each file is stored as a sequence of blocks of same size, except

the last one

• Block size and replication factor are configurable per file at the file creation time and can be changed later

• Block replication occurs based on periodic heartbeat and a Blockreport from each of the DataNodes in the cluster

Page 35: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Architecture

Page 36: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

HDFS Interfaces

• There are many interfaces to interact with HDFS

• Simplest way of interacting with HDFS in command-line

• Two properties are set in HDFS configuration

• Default Hadoop filesystem fs.default.name: hdfs://localhost/• Used to determine the host (localhost) and port (8020) for the

HDFS NameNode

• Replication factor dfs.replication• Default is 3, disable replication by setting it to 1 (single datanode)

• Other HDFS interfaces• HTTP: a read only interface for retrieving directory listings and data

over HTTP

• FTP: permits the use of the FTP protocol to interact with HDFS

Page 37: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Data Pipelining

• Client retrieves a list of DataNodes on which to place replicas of a block

• Client writes block to the first DataNode• The first DataNode forwards the data to the next

DataNode in the Pipeline• When all replicas are written, the Client moves on to

write the next block in file

Page 38: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Replication in HDFS

• Replica placement• Critical to improve data reliability, availability and network bandwidth

utilization

• Rack-aware policy as rack failure is far less than node failure

• With the default replication factor (3), one replica is put on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack

• One third of replication are on one node; two-third of replicas are on one rack, and the other third are evenly distributed across racks

• Benefits is to reduce inter-rack write traffic

• Replica selection• A read request is satisfied from a replica that is nearby to the application

• Minimizes global bandwidth consumption and read latency

• If HDFS spans multiple data center, replica in the local data center is preferred over any remote replica

Page 39: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Replication in HDFS (Cont’d)

• Safemode• On startup, NameNode enters in Safemode and wait for Heartbeat and

Blockreport messages from DataNodes

• Each block has a specified number of replicas

• Once NameNode knows the status of a specified number of replicas in each block, it exists safemode and if request replicates data blocks to other DataNodes.

• Filesystem metadata• DataNode does not create all files in the same directory, but uses a

heuristic to determine the optimal number of files per directories

• On startup, a DataNode scans through its local filesystem, generates a list of all HDFS data blocks and sends the report to the NameNode

• Checkpoint• Image of the entire file system namespace and file Blockmap in memory

• Single checkpoint is created at startup and also periodically

Page 40: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Communication Protocol

• All HDFS communication protocols are layered on top of the TCP/IP protocol

• A client establishes a connection to a configurable TCP port on the NameNode machine and uses ClientProtocol

• DataNodes talk to the NameNode using DataNode protocol

• A Remote Procedure Call (RPC) abstraction wraps both the ClientProtocol and DataNode protocol

• NameNode never initiates a RPC, instead it only responds to RPC requests issued by DataNodes or clients

Page 41: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Robustness

• Primary objective of HDFS is to store data reliably even during failures

• Three common types of failures: NameNode, DataNode and network partitions

• Data disk failure• Heartbeat messages to track the health of DataNodes

• NameNodes performs necessary re-replication on DataNode unavailability, replica corruption or disk fault

• Cluster rebalancing• Automatically move data between DataNodes, if the free space on a

DataNode falls below a threshold or during sudden high demand

• Data integrity• Checksum checking on HDFS files, during file creation and retrieval

• Metadata disk failure• Manual intervention – no auto recovery, restart or failover

Page 42: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

How Hadoop Runs a MapReduce job

• Client submits MapReduce job

• JobTracker coordinates job run

• TaskTracker runs split tasks

• HDFS is used for file storage

Page 43: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Streaming and Pipes

• Hadoop Streaming, API to MapReduce to write non-Java map and reduce function

• Hadoop and the user program communicates using standard I/O streams

• Hadoop Pipes is the C++ interface to MapReduce

• Uses socket as channel to communicate with the process running the C++ Map or Reduce function

Page 44: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Progress and Status Updates

• Operations constituting progress• Reading an input record

• Writing an output record

• Setting status description

• Incrementing a counter

• Calling progress () method

Page 45: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop Failures

• Task failure• Map or reduce task throws a runtime exception

• For streaming tasks, streaming processes exiting with a non-zero exit code are considered as failed

• Task call also be killed and re-scheduled

• Tasktracker failure• Crash or slow execution can cause infrequent (or stop) sending

heartbeats to the job tracker

• A tasktracker can also be blacklisted by the jobtracker if it fails a significant number of tasks, higher than average task failure rate

• Jobtracker failure• Single point of failure - no mechanism to deal with it

• One solution is to run multiple jobtracker or have backup jobtracker

Page 46: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Checkpointing in Hadoop

Page 47: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Job Scheduling in Hadoop

• Started with FIFO scheduling and now comes with a choice of schedulers

• The fair scheduler• Aims to give every user a fair share of the cluster capacity over time

• Jobs are placed in pools and by default each user gets their own pool

• Support preemption – capacity provisioning of over-capacity to under-capacity pool

• The capacity scheduler• Slightly different approach to multi-user scheduling

• A cluster is made up of a number of queues, which may be hierarchical, and each queue has an allocated capacity

• Within each queue jobs are scheduled using FIFO scheduling, with priorities

Page 48: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Shuffle and Sort

• Input to each reducer is sorted by key• Shuffle is the process for performing sort and sending map outputs

to reducers as inputs• The Map side - each map task has a memory buffer

• Data partitioning, in-memory sort and use of combiner function

• The Reduce side - reduce tasks copy map outputs in parallel• Upon copying all outputs, perform merge/sort before task execution

Page 49: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Summary – Lecture 1

• MapReduce makes it easy to parallelize and distribute computations and make them fault tolerant

• Locality of optimization is the key – target is to reduce the amount of data that are sent across networks

• Redundant execution can be used to reduce the impact of slow machines and to handle machine failures and data loss

• HDFS is the file-system abstraction• Block-based abstraction simplifies the storage subsystem

• HDFS blocks are larger than disk blocks to minimize the cost of seeks

• Blocks fit well with replication for providing fault tolerance and availability

Page 50: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Lecture 2

• Hadoop Cluster Setup

• MapReduce Advancements

• Aneka Case Study

• Hadoop Lab Notes

Page 51: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Setting Up a Hadoop Cluster

• A two-level network topology is typical for a Hadoop cluster• Get Apache Hadoop distribution and install Hadoop, may also use a

installation script• Configure JAVA_HOME, NameNode, DataNdoe, JobTracker and

TaskTracker, HADOOP_LOG_DIR, HADOOP_HEAPSIZE• Can also have a single node setup for simple operations

Page 52: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Single Node Setup: Configurations

Files to configure:

• hadoop-env.shOpen the file <HADOOP_INSTALL>/conf/hadoop-env.sh in the editor of your choice and

set the JAVA_HOME environment variable to the Sun JDK/JRE 1.5.0 directory.------------------------------------------------------------------- # The java implementation to use. Required. # export JAVA_HOME=/usr/lib/j2sdk1.5-sun-----------------------------------------------------------

• hadoop-site.xml

Any site-specific configuration of Hadoop is configured in <HADOOP_INSTALL>/conf/hadoop-site.xml. Here we will configure the directory where Hadoop will store its data files, the ports it listens to, etc.

You can leave the settings below as is with the exception of the hadoop.tmp.dir variable which you have to change to the directory of your choice, for example /usr/local/hadoop-datastore/hadoop-${user.name}.

--------------------------------------------------------------------

<property>

<name>hadoop.tmp.dir</name>

<value>/your/path/to/hadoop/tmp/dir/hadoop-${user.name}</value>

<description>A base for other temporary directories.</description>

</property>

----------------------------------------------------------------------

Page 53: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Formatting the name node:

The first step to starting up your Hadoop installation is formatting the Hadoop file system which is implemented on top of the local file system of your "cluster“. You need to do this the first time you set up a Hadoop cluster. cluster. Do not format a running Hadoop filesystem, this will cause all your data to be erased.

Run the command :

hadoop@ubuntu:~$ <HADOOP_INSTALL>/hadoop/bin/hadoop namenode –format

Starting cluster:

This will startup a Namenode, Datanode, Jobtracker and a Tasktracker .

Run the command:

hadoop@ubuntu:~$ <HADOOP_INSTALL>/bin/start-all.sh

Stopping cluster:

To stop all the daemons running on your machine,

Run the command:

hadoop@ubuntu:~$ <HADOOP_INSTALL>/bin/stop-all.sh

Starting the Single Node Cluster

Page 54: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Now we will modify the Hadoop configuration to make one Ubuntu box the master (which will also act as a slave) and the other Ubuntu box a slave.

The best way to do this is to install, configure and test a "local" Hadoop setup for each of the two Ubuntu boxes, and in a second step to "merge" these two single-node clusters into one multi-node cluster in which one Ubuntu box will become the designated master (but also act as a slave with regard to data storage and processing), and the other box will become only a slave.

The master node will run the "master" daemons for each layer: namenode for the HDFS storage layer, and jobtracker for the MapReduce processing layer. Both machines will run the "slave" daemons: datanode for the HDFS layer, and tasktracker for MapReduce processing layer.

We will call the designated master machine just the master from now and the slave-only machine the slave.

Both machines must be able to reach each other over the network

Shutdown each single-node cluster with <HADOOP_INSTALL>/bin/stop-all.sh before continuing if you haven't done so already.

Multi-Node Setup

Page 55: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Multi-Node Setup: Configurations

Files to configure:

conf/masters (master only)

The conf/masters file defines the master nodes of our multi-node cluster. In our case, this is just the master machine.

On master, update <HADOOP_INSTALL>/conf/masters that it looks like this:

----------------------

master

---------------------

conf/slaves (master only)

This conf/slaves file lists the hosts, one per line, where the Hadoop slave daemons (datanodes and tasktrackers) will run. We want both the master box and the slave box to act as Hadoop slaves because we want both of them to store and process data.

On master, update <HADOOP_INSTALL>/conf/slaves that it looks like this:

------------------

Master

slave

-------------------

If you have additional slave nodes, just add them to the conf/slaves file, one per line.

Page 56: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

conf/hadoop-site.xml (all machines):

Assuming you configured conf/hadoop-site.xml on each machine as described in the single-node cluster tutorial, you will only have to change a few variables.

Important: You have to change conf/hadoop-site.xml on ALL machines as follows.

First, we have to change the fs.default.name variable which specifies the NameNode (the HDFS master) host and port. In our case, this is the master machine.

------------------------------------------

<property>

<name>fs.default.name</name>

<value>hdfs://master:54310</value>

<description>The name of the default file system. . .

</property>

---------------------------------------

Second, we have to change the mapred.job.tracker variable which specifies the JobTracker (MapReduce master) host and port. Again, this is the master in our case.

-------------------------------------------------------

<property>

<name>mapred.job.tracker</name>

<value>master:54311</value>

<description>The host and port that the MapReduce job tracker runs at . . . </description>

</property>

-------------------------------------------------

Multi-Node Setup: Configurations (Cont’d)

Page 57: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Third, we change the dfs.replication variable which specifies the default block replication. It defines how many machines a single file should be replicated to before it becomes available. If you set this to a value higher than the number of slave nodes that you have available, you will start seeing a lot of type errors in the log files.

---------------------------------

<property>

<name>dfs.replication</name>

<value>2</value>

<description>Default block replication. . .</description>

</property>

----------------------------------

Additional settings:

conf/hadoop-site.xml

You can change the mapred.local.dir variable which determines where temporary MapReduce data is written. It also may be a list of directories.

Multi-Node Setup: Configurations (Cont’d)

Page 58: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Starting the Multi-node Cluster

Formatting the namenode

Before we start our new multi-node cluster, we have to format Hadoop's distributed filesystem (HDFS) for the namenode. You need to do this the first time you set up a Hadoop cluster. Do not format a running Hadoop namenode, this will cause all your data in the HDFS filesytem to be erased.

To format the filesystem (which simply initializes the directory specified by the dfs.name.dir variable on the namenode), run the command (from the master):

--------------------------------------------

bin/hadoop namenode -format

---------------------------------------------

Starting the multi-node cluster:

Starting the cluster is done in two steps. First, the HDFS daemons are started: the namenode daemon is started on master, and datanode daemons are started on all slaves (here: master and slave). Second, the MapReduce daemons are started: the jobtracker is started on master, and tasktracker daemons are started on all slaves (here: master and slave).

Page 59: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Starting the multi-node cluster (Cont’d)

HDFS daemons:

Run the command <HADOOP_INSTALL>/bin/start-dfs.sh on the machine you want the namenode to run on. This will bring up HDFS with the namenode running on the machine you ran the previous command on, and datanodes on the machines listed in the conf/slaves file.

In our case, we will run bin/start-dfs.sh on master:

-------------------------

bin/start-dfs.sh

---------------------------

On slave, you can examine the success or failure of this command by inspecting the log file <HADOOP_INSTALL>/logs/hadoop-hadoop-datanode-slave.log.

At this point, the following Java processes should run on master:

-----------------------------------

hadoop@master:/usr/local/hadoop$ jps

14799 NameNode

15314 Jps

14880 DataNode

14977 SecondaryNameNode

------------------------------------

Page 60: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

and the following Java processes should run on slave:

--------------------------------------

hadoop@slave:/usr/local/hadoop$ jps

15183 DataNode

15616 Jps

---------------------------------------

MapReduce daemons:

Run the command <HADOOP_INSTALL>/bin/start-mapred.sh on the machine you want the jobtracker to run on. This will bring up the MapReduce cluster with the jobtracker running on the machine you ran the previous command on, and tasktrackers on the machines listed in the conf/slaves file.

In our case, we will run bin/start-mapred.sh on master:

-------------------------------------

bin/start-mapred.sh

-------------------------------------

On slave, you can examine the success or failure of this command by inspecting the log file <HADOOP_INSTALL>/logs/hadoop-hadoop-tasktracker-slave.log.

Starting the multi-node cluster (Cont’d)

Page 61: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

At this point, the following Java processes should run on master:

----------------------------------------------------

hadoop@master:/usr/local/hadoop$ jps

16017 Jps

14799 NameNode

15686 TaskTracker

14880 DataNode

15596 JobTracker

14977 SecondaryNameNode

----------------------------------------------------

And the following Java processes should run on slave:

---------------------------------------

hadoop@slave:/usr/local/hadoop$ jps

15183 DataNode

15897 TaskTracker

16284 Jps

-------------------------------------------

Starting the multi-node cluster (Cont’d)

Page 62: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Stopping the Multi-Node Cluster

First, we begin with stopping the MapReduce daemons: the jobtracker is stopped on master, and tasktracker daemons are stopped on all slaves (here: master and slave). Second, the HDFS daemons are stopped: the namenode daemon is stopped on master, and datanode daemons are stopped on all slaves (here: master and slave).

MapReduce daemons:

Run the command <HADOOP_INSTALL>/bin/stop-mapred.sh on the jobtracker machine. This will shut down the MapReduce cluster by stopping the jobtracker daemon running on the machine you ran the previous command on, and tasktrackers on the machines listed in the conf/slaves file.

In our case, we will run bin/stop-mapred.sh on master:

-------------------------------

bin/stop-mapred.sh

-------------------------------

At this point, the following Java processes should run on master:

--------------------------------------

hadoop@master:/usr/local/hadoop$ jps

14799 NameNode

18386 Jps

14880 DataNode

14977 SecondaryNameNode

--------------------------------------------

Page 63: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

And the following Java processes should run on slave:

-------------------------------

hadoop@slave:/usr/local/hadoop$ jps

15183 DataNode

18636 Jps

--------------------------------

HDFS daemons:

Run the command <HADOOP_INSTALL>/bin/stop-dfs.sh on the namenode machine. This will shut down HDFS by stopping the namenode daemon running on the machine you ran the previous command on, and datanodes on the machines listed in the conf/slaves file.

In our case, we will run bin/stop-dfs.sh on master:

---------------------------------

bin/stop-dfs.sh

---------------------------------

At this point, the only following Java processes should run on master:

-------------------------------

hadoop@master:/usr/local/hadoop$ jps

18670 Jps

------------------------------

Stopping the Multi-Node Cluster (Cont’d)

Page 64: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

And the following Java processes should run on slave:

--------------------------------

hadoop@slave:/usr/local/hadoop$ jps

18894 Jps

--------------------------------

Stopping the Multi-Node Cluster (Cont’d)

Page 65: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Running a MapReduce job

We will now run your first Hadoop MapReduce job. We will use the WordCount example job which reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occurred, separated by a tab.

• Download example input data:

The Notebooks of Leonardo Da Vinci

Download the ebook as plain text file in us-ascii encoding and store the uncompressed file in a temporary directory of choice, for example /tmp/gutenberg.

• Restart the Hadoop cluster

Restart your Hadoop cluster if it's not running already.

-------------------------

hadoop@ubuntu:~$ <HADOOP_INSTALL>/bin/start-all.sh

• Copy local data file to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop's HDFS

-----------------------------

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/source destination

Page 66: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

• Run the MapReduce job

Now, we actually run the WordCount example job.

This command will read all the files in the HDFS “destination” directory , process it, and store the result in the HDFS directory “output”.

-----------------------------------------

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop hadoop-example wordcount destination output

-----------------------------------------

You can check if the result is successfully stored in HDFS directory “output”.

• Retrieve the job result from HDFS

To inspect the file, you can copy it from HDFS to the local file system.

-------------------------------------

hadoop@ubuntu:/usr/local/hadoop$ mkdir /tmp/output

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs –copyToLocal output/part-00000 /tmp/output

----------------------------------------

Alternatively, you can read the file directly from HDFS without copying it to the local file system by using the command :

---------------------------------------------

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs –cat output/part-00000

Running a MapReduce Job (Cont’d)

Page 67: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop Web Interfaces

• MapReduce Job Tracker Web Interface The job tracker web UI provides information about general job statistics of the

Hadoop cluster, running/completed/failed jobs and a job history log file. It also gives access to the local machine's Hadoop log files (the machine on which the web UI is running on).

By default, it's available at http://localhost:50030/

• Task Tracker Web Interface The task tracker web UI shows you running and non-running tasks. It also gives

access to the local machine's Hadoop log files. By default, it's available at http://localhost:50060/

• HDFS Name Node Web Interface The name node web UI shows you a cluster summary including information about

total/remaining capacity, live and dead nodes. Additionally, it allows you to browse the HDFS namespace and view the contents of its files in the web browser. It also gives access to the local machine's Hadoop log files.

By default, it's available at http://localhost:50070/

Page 68: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Writing An Hadoop MapReduce Program

Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be developed in other languages like Python or C++ (the latter since version 0.14.1).

Creating a launching program for your application• The launching program configures: – The Mapper and Reducer to use – The output key and value types (input types are inferred from the

InputFormat) – The locations for your input and output• The launching program then submits the job and typically waits for it to

complete

A Map/Reduce may specify how it’s input is to be read by specifying an InputFormat to be used

A Map/Reduce may specify how it’s output is to be written by specifying an OutputFormat to be used

Page 69: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Using HDFS

• hadoop dfs• [-ls <path>]• [-du <path>]• [-cp <src> <dst>]• [-rm <path>]• [-put <localsrc> <dst>]• [-copyFromLocal <localsrc> <dst>]• [-moveFromLocal <localsrc> <dst>]• [-get [-crc] <src> <localdst>]• [-cat <src>]• [-copyToLocal [-crc] <src> <localdst>]• [-moveToLocal [-crc] <src> <localdst>]• [-mkdir <path>]• [-touchz <path>]• [-test -[ezd] <path>]• [-stat [format] <path>]• [-help [cmd]]

Page 70: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Using HDFS (Cont’d)

• File system reformatting is easy

hadoop namenode –format

• Basically most commands look similar

• hadoop “some command” options

• If you just type hadoop you get all possible commands (including undocumented ones )

Page 71: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Who uses Hadoop?

• Amazon/A9

• Facebook

• Google

• IBM

• Joost

• Last.fm

• New York Times

• PowerSet

• Veoh

• Yahoo!

Page 72: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop @ Yahoo!

Source:Eric Baldeschwieler

VP Hadoop DevelopmentYahoo!

Page 73: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop is critical to Yahoo’s business

• When you visit yahoo, you are interacting with data processed with Hadoop!

Page 74: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop is critical to Yahoo’s business

Ads Optimization

Content Optimization Search

Index

Content Feed Processing

• When you visit yahoo, you are interacting with data processed with Hadoop!

Page 75: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop is critical to Yahoo’s business

Ads Optimization

Content Optimization Search

Index

Content Feed Processing

Machine Learning

(e.g. Spam filters)

• When you visit yahoo, you are interacting with data processed with Hadoop!

Page 76: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Tremendous Impact on Productivity

• Makes Developers & Scientists more productive• Key computations solved in days and not months

• Projects move from research to production in days

• Easy to learn, even our rocket scientists use it!

• The major factors• You don’t need to find new hardware to experiment

• You can work with all your data!

• Production and research based on same framework

• No need for R&D to do IT (it just works)

Page 77: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

77

Search & Advertising SciencesHadoop Applications: Search Assist™

Before Hadoop After Hadoop

Time 26 days 20 minutes

Language C++ Python

Development Time 2-3 weeks 2-3 days

• Database for Search Assist™ is built using Hadoop. • 3 years of log-data• 20-steps of map-reduce

Page 78: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Largest Hadoop Clusters in the Universe

• 25,000+ nodes (~200,000 cores)• Clusters of up to 4,000 nodes

• 4 Tiers of clusters• Development, Testing and QA (~10%)

• Proof of Concepts and Ad-Hoc work (~10%)• Runs the latest version of Hadoop – currently 0.20

• Science and Research (~60%)• Runs more stable versions

• Production (~20%)• Currently Hadoop 0.18.3

Page 79: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Large Hadoop-Based Applications

2008 2009

Webmap ~70 hours runtime~300 TB shuffling~200 TB output1480 nodes

~73 hours runtime~490 TB shuffling~280 TB output2500 nodes

Sort benchmarks(Jim Gray contest)

1 Terabyte sorted•209 seconds•900 nodes

1 Terabyte sorted•62 seconds, 1500 nodes1 Petabyte sorted•16.25 hours, 3700 nodes

Largest cluster 2000 nodes•6PB raw disk•16TB of RAM•16K CPUs

4000 nodes•16PB raw disk•64TB of RAM•32K CPUs•(40% faster CPUs too)

Page 80: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop at Facebook

• Production cluster• 4800 cores, 600 machines, 16GB per machine – April 2009• 8000 cores, 1000 machines, 32 GB per machine – July

2009• 4 SATA disks of 1 TB each per machine• 2 level network hierarchy, 40 machines per rack• Total cluster size is 2 PB, projected to be 12 PB in Q3 2009

• Test cluster• 800 cores, 16GB each

Page 81: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Backend Data Warehousing

Web Servers

Scribe Servers

Network Storage

Hadoop ClusterOracle RAC MySQL

• 3 TB of compressed log data is generated per day• All these data are stored and process by the Hadoop cluster

consisting of 0ver 600 machines• The summary of log data is then copied to Oracle and MySQL

for easy access and further analysis

Page 82: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Notable MapReduce Advancements

• GPU-based MapReduce variants

• Extend MapReduce to be used as a Cloud service

• Perform real-time distributed stream processing

• Use MapReduce for quality-controlled decision making, such as weather forecasting, defense, emergency response system, commerce/finance trading and transport/vehicle/dam flow control

• Overcome the performance bottleneck generated from a single data collection point and minimize the time taken during data validation process

• Ensure improved data management and better performance

Page 83: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce and GPUs

• MapReduce programming framework is a natural fit for GPUs

• Using this framework saves significant programming effort

• Lots of parallelism to execute independent threads, multithreading, and abstraction of low level details

• However, there are constraints• MapReduce is typically applied to large

batch-processing applications• Efficient reductions on GPUs are difficult,

due to dependencies• Performance issue due to writing

intermediary results in local disks• Each cluster node running MapReduce

is 2-4 core CPU, with no GPU attached• Moving data to cloud incurs extra cost

Page 84: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Amazon Elastic MapReduce

Upload to Amazon S3 the data, as well as the map and reduce executables, and then send a

request to Elastic MapReduce to start a job flow

Elastic MapReduce starts a Hadoop cluster, which loads any specified bootstrap actions and

thenruns Hadoop on each node

Hadoop executes a job flow by downloading data from Amazon S3 to core and task nodes.

Alternatively, the data is loaded dynamically at run time by map tasks

Hadoop processes the data and then uploads the results from the cluster to Amazon S3

The job flow is completed and processed data can be retrieved from Amazon S3

Page 85: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce Online

• A variant of MapReduce where intermediate data is pipelined between Map and Reduce functions, while preserving the programming interfaces and fault tolerance

• Hadoop online prototype: http://code.google.com/p/hop/

• Task pipelining• Map task push data to reducers as it is produced by using two threads,

one for running the map function and another for periodically sending the output from in-memory buffer to reduce

• If a reduce task is yet to be scheduled, map output is written to disk as in regular MapReduce

• Job pipelining• Send a job’s Reduce output as input to next job’s Map, however,

Reduction of previous job and Map of next job are not overlapped

• Perform reduction on whatever Map output is available, producing snapshots

• Online aggregation of snapshots and continuous query pipelining

Page 86: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Case Study on Aneka: Cloud Application Platform (CAP) using MapReduce

• Lightweight Container hosting multiple services

• All programming models available from within the same container

• SDK containing APIs for multiple programming models and tools

• Runtime Environment for managing application execution management

• Suitable for• Development of large-scale

Enterprise Cloud Applications• Cloud enabling legacy

applicationsPrivate Cloud

LAN network

AmazonMicrosoft Google

IBM

Data Center

Hardware Profile Services

Container

Persisten

ce

TaskModel

ThreadModel

Map Reduce Model

OtherModels

.NET @ Windows Mono @ Linux

Secu

rity

Programming Models

Software Development Kit

ManagementStudio

Application

Foundation Services

MembershipServices

ReservationServices

LicenseServices

APIsDesign Explorer

Management Kit

AdministrationPortal

SLA-NegotiationWeb Services

ManagementWeb Services

StorageServices

AccountingServices

Fabric Services

Dynamic Resource Provisioning Services

Infrastructure

Physical Machines/Virtual Machines

Private Cloud

LAN network

Private Cloud

LAN network

AmazonMicrosoft Google

IBM

Data Center

AmazonMicrosoft Google

IBM

Data Center

Hardware Profile Services

Container

Persisten

ce

TaskModel

ThreadModel

Map Reduce Model

OtherModels

.NET @ Windows Mono @ Linux

Secu

rity

Programming Models

Software Development Kit

ManagementStudio

Application

Foundation Services

MembershipServices

ReservationServices

LicenseServices

APIsDesign Explorer

Management Kit

AdministrationPortal

SLA-NegotiationWeb Services

ManagementWeb Services

StorageServices

AccountingServices

Fabric Services

Dynamic Resource Provisioning Services

Infrastructure

Physical Machines/Virtual Machines

Patent

(PCT)

Manjrasoft

Page 87: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka: components

public DumbTask: ITask { … public void Execute() { …… }}

for(int i=0; i<n; i++){ … DumbTask task = new DumbTask(); app.SubmitExecution(task);}

Executor

Scheduler

Executor

Executor Executor

ClientAgent

work units

internet

internet

Aneka enterprise Cloud

ClientAgent

work units

Aneka User Agent

Aneka Worker ServiceAneka Manager

Programming / Deployment Model

Manjrasoft

Page 88: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

MapReduce Programming Model in Aneka

• Implementation of Map and Reduce function• Logic for MapReduce scheduler, executor and manager

infrastructure

end users

scheduling

execution

coordination

client component

abstractions

units of execution

MapReduce Model

MapReduceScheduler

MapReduceExecutor

MapReduceManager

Mapper

Reducer

Page 89: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce Infrastructure

Page 90: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce Abstractions Object Model

Page 91: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce Scheduling Service

Page 92: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce Execution Service

Page 93: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce Data File Format

Page 94: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Aneka MapReduce

• MapReduce model in Aneka is function-based:• user defines the functions operating on the data

• user configures the middleware with these functions

• user provides the data

• middleware automatically generates the tasks required to execute the functions on the data

• MapReduce model samples• Word counter

• Estimation of Pi

Page 95: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Application 1 - GoFront: A unit of China Southern Railway Group)

Aneka utilizes 30 desktops to decrease task time from days to hours

Time (in hrs)

Single Server

Aneka Cloud

Raw Locomotive Design Files(Using AutoDesk Maya) Using Maya

Graphical Mode Directly

Case 1: Single Server

4 cores server

Aneka Maya Renderer

Use private Aneka Cloud

GoFront Private Aneka Cloud

LAN network (Running Maya Batch Mode on demand)

Case 2: Aneka Enterprise Cloud

Manjrasoft

Application: Locomotive design CAD rendering

Manjrasoft

Page 96: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Application 2 - TitanStrike On-line Gaming Portal

TitanStrike Private Aneka Cloud

LAN network (Running Game plugins on Demand) Case 2: Aneka Enterprise Cloud = Scalability

Aneka-based GameController

The local scheduler interacts with Aneka and distributes the load in the cloud.

Manjrasoft

Distributed log parsing

logs logs

Case 1: Single Server = Huge Overload

Single scheduler controlling the execution of all the matches.

Game Servers

Gamers profilesPlayers statisticsTeam playingMultiple games

Titan Strike On Line Gaming Portal

Centralized log parsing

logs

logsSingle GameController

Manjrasoft

Page 97: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Hadoop Lab – Notable Points

• Pre-installed Hadoop on XE• 16 processors over 2 nodes

• Use of a script to load Hadoop module and run application

• HDFS is installed on top of Lustre file system

• No data locality as there is no concept of local disk

• Namenode and datanode running on the same node

• Hadoop installation on XE is experimental• Be prepared to get some surprise in performance and during

program execution!!

• The main focus of the Lab will be to modify the typical MapReduce implementation for word count to perform some advanced operations

Page 98: Data Intensive Computing: MapReduce and Hadoop Distributed File System Mukaddim Pathan Research Scientist, CSIRO ICT Centre

Thanks for Your Attention!

• Hadoop and MapReduce resources:

• Hadoop: The Definitive Guide, Tom White, O’Reilly | Yahoo! Press, 2009

• Jeffrey Dean and Sanjay Ghemawat, “MapReduce: Simplified Data Processing on Large Clusters”, OSDI’04, http://labs.google.com/papers/ mapreduce.html

• “Yahoo! Launcehs World’s Largest Hadoop Production Application”, 19 February 2008, http://developer.yahoo.com/blogs/hadoop/posts/2008/02/ yahoo-worlds-largest-production-hadoop/

• Tutorials and user guides at: http://hadoop.apache.org/

• HDFS: http://hadoop.apache.org/core/docs/current/hdfs_design.html

• Hadoop API: http://hadoop.apache.org/core/docs/current/api/

• Contact

[email protected]