hadoop tutorial

95
0 Index 1. Why Hadoop .......................................................................... 1 2. Hadoop Basic Concepts ........................................................ 3 3. Introduction to the HDFS ...................................................... 11 4. Introduction to MapReduce ................................................... 19 5. MapReduce API .................................................................... 29 6. Advance Hadoop API ............................................................ 37 7. More Advanced MapReduce Programming ......................... 49 8. Joining Data Sets in MapReduce Jobs .................................. 61 9. Hive ....................................................................................... 65 10. Pig ........................................................................................ 76 11. Hbase ................................................................................... 85

Upload: nazumuddin-shaikh

Post on 26-Sep-2015

28 views

Category:

Documents


2 download

DESCRIPTION

Best guide to learn Hadoop.All the best..... :)

TRANSCRIPT

  • 0

    Index

    1. Why Hadoop .......................................................................... 1

    2. Hadoop Basic Concepts ........................................................ 3

    3. Introduction to the HDFS ...................................................... 11

    4. Introduction to MapReduce ................................................... 19

    5. MapReduce API .................................................................... 29

    6. Advance Hadoop API ............................................................ 37

    7. More Advanced MapReduce Programming ......................... 49

    8. Joining Data Sets in MapReduce Jobs .................................. 61

    9. Hive ....................................................................................... 65

    10. Pig ........................................................................................ 76

    11. Hbase ................................................................................... 85

  • 1

    Why Hadoop

    Simply put, Hadoop can transform the way you store and process data throughout your enterprise.

    According to analysts, about 80% of the data in the world is unstructured, and until Hadoop, it was

    essentially unusable in any systematic way. With Hadoop, for the first time you can combine all your

    data and look at it as one.

    Make All Your Data Profitable

    Hadoop enables you to gain insight from all the data you already have; to ingest the data flowing into

    your systems 24/7 and leverage it to make optimizations that were impossible before; to make decisions

    based on hard data, not hunches; to look at complete data, not samples; to look at years of transactions,

    not days or weeks. In short, Hadoop will change the way you run your organization.

    Leverage All Types of Data, From All Types of Systems

    Hadoop can handle all types of data from disparate systems: structured, unstructured, log files, pictures,

    audio files, communications records, email just about anything you can think of. Even when different

    types of data have been stored in unrelated systems, you can dump it all into your Hadoop cluster

    before you even know how you might take advantage of it in the future.

    Scale beyond Anything You Have Today

    The largest social network in the world is built on the same open-source technology as Hadoop, and now

    exceeds 100 petabytes. Its unlikely your organization has that much data. As you need more capacity,

    you just add more commodity servers and Hadoop automatically incorporates the new storage and

    compute capacity.

    E-tailing

    Recommendation engines increase average order size by recommending complementary

    products based on predictive analysis for cross-selling.

    Cross-channel analytics sales attribution, average order value, lifetime value (e.g., how many

    in-store purchases resulted from a particular recommendation, advertisement or promotion).

    Event analytics what series of steps (golden path) led to a desired outcome (e.g., purchase,

    registration).

    Financial Services Compliance and regulatory reporting.

    Risk analysis and management.

    Fraud detection and security analytics.

    CRM and customer loyalty programs.

    Credit scoring and analysis.

    Trade surveillance.

  • 2

    Government Fraud detection and cybersecurity.

    Compliance and regulatory analysis.

    Energy consumption and carbon footprint management.

    Health & Life Sciences

    Campaign and sales program optimization.

    Brand management.

    Patient care quality and program analysis.

    Supply-chain management.

    Drug discovery and development analysis.

    Retail/CPG

    Merchandizing and market basket analysis.

    Campaign management and customer loyalty programs.

    Supply-chain management and analytics.

    Event- and behavior-based targeting.

    Market and consumer segmentations.

    Telecommunications

    Revenue assurance and price optimization.

    Customer churn prevention.

    Campaign management and customer loyalty.

    Call Detail Record (CDR) analysis.

    Network performance and optimization.

    Web & Digital Media Services

    Large-scale clickstream analytics.

    Ad targeting, analysis, forecasting and optimization.

    Abuse and click-fraud prevention.

    Social graph analysis and profile segmentation.

    Campaign management and loyalty programs.

  • 3

    Hadoop Basic Concepts

    Apache Hadoop

    Apache Hadoop is a software solution for distributed computing of large datasets.

    Hadoop provides a distributed file system (HDFS) and a MapReduce implementation.

    Apache Hadoop can be used to filter and aggregate data, e.g. a typical use case would be the

    analysis of webserver logs file to find the most visited pages.

    HDFS Hadoop Distributed File System

    HDFS is an Apache Software Foundation project and a subproject of the Apache Hadoop project.

    HDFS is fault tolerant and provides high-throughput access to large data sets.

    Overview of HDFS

    HDFS has many similarities with other distributed file systems, but is different in several respects. One noticeable difference is HDFS's write-once-read-many model that relaxes concurrency control

    requirements, simplifies data coherency, and enables high-throughput access.

    HDFS has many goals. Here are some of the most notable:

    Scalability to reliably store and process large amounts of data. Economy by distributing data and processing across clusters of commodity personal computers. Efficiency by distributing data and logic to process it in parallel on nodes where data is located. Reliability by automatically maintaining multiple copies of data and automatically redeploying

    processing logic in the event of failures.

    Hadoop Multi-node Architecture

    The Hadoop architecture is made simple in the diagram. The MapReduce algorithm sits on top of a distributed file system. Arrows represent data access. Large enclosing rectangles represent the master and slave nodes. The small rectangles represent functional units.

    The file system layer can be any virtualized distributed file system. Hadoop performs best when coupled with the Hadoop Distributed File System because the physical data node, being location/rack aware, can be placed closer to the task tracker that will access this data.

  • 4

    Fig : Hadoop Multi-Node cluster Architecture.

    Definitions /Acronyms

    DataNode:

    A DataNode stores data in the [Hadoop Filesystem]. A functional file system has more than one

    DataNode, with data replicated across them.

    NameNode:

    NameNode serves as both directory namespace manager and "inode table" for the Hadoop DFS. There

    is a single Name Node running in any DFS deployment.

    MapReduce:

    Hadoop MapReduce is a programming model and software framework for writing applications that

    rapidly process vast amounts of data in parallel on large clusters of compute nodes.

    Secondary NameNode:

    The Secondary Namenode regularly connects with the Primary Namenode and builds snapshots of the

    Primary Namenode's directory information, which is then saved to local/remote directories.

  • 5

    JobTracker:

    The JobTracker is the service within Hadoop that farms out MapReduce tasks to specific nodes in the

    cluster, ideally the nodes that have the data, or at least are in the same rack.

    1. Client applications submit jobs to the Job tracker.

    2. The JobTracker talks to the NameNode to determine the location of the data.

    3. The JobTracker locates TaskTracker nodes with available slots at or near the data.

    4. The JobTracker submits the work to the chosen TaskTracker nodes.

    5. The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker.

    6. A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may even blacklist the TaskTracker as unreliable.

    7. When the work is completed, the JobTracker updates its status.

    8. Client applications can poll the JobTracker for information.

    The JobTracker is a point of failure for the Hadoop MapReduce service. If it goes down, all running jobs

    are halted.

    TaskTracker:

    1. A TaskTracker is a node in the cluster that accepts tasks - Map, Reduce and Shuffle operations - from a JobTracker.

    2. Every TaskTracker is configured with a set of slots; these indicate the number of tasks that it can accept.

    3. When the JobTracker tries to find somewhere to schedule a task within the MapReduce operations, it first looks for an empty slot on the same server that hosts the DataNode containing the data, and if not, it looks for an empty slot on a machine in the same rack.

  • 6

    Hadoop Architecture:

    Fig : An architecture that explains how HDFS works

    The following are some of the key points to remember about the HDFS:

    In the above diagram, there is one NameNode, and multiple DataNodes (servers). b1, b2, indicates data blocks.

    When you dump a file (or data) into the HDFS, it stores them in blocks on the various nodes in the hadoop cluster. HDFS creates several replications of the data blocks and distributes them accordingly in the cluster in way that will be reliable and can be retrieved faster. A typical HDFS block size is 128MB. Each and every data block is replicated to multiple nodes across the cluster.

    Hadoop will internally make sure that any node failure will never results in a data loss. There will be one NameNode that manages the file system metadata. There will be multiple DataNodes (These are the real cheap commodity servers) that will store

    the data blocks. When you execute a query from a client, it will reach out to the NameNode to get the file

    metadata information, and then it will reach out to the DataNodes to get the real data blocks. Hadoop provides a command line interface for administrators to work on HDFS. The NameNode comes with an in-built web server from where you can browse the HDFS

    filesystem and view some basic cluster statistics.

    How MapReduce Works?

    The whole process is illustrated in Figure 1. At the highest level, there are four independent entities: The client, which submits the MapReduce job.

    The jobtracker, which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker. The tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker. The distributed filesystem , which is used for sharing job files between the other entities.

  • 7

    Figure 1. How Hadoop runs a MapReduce job

    Other Hadoop Ecosystem Components

    Figure : Hadoop Ecosystem Components

  • 8

    Hive:

    Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data

    summarization, query and analysis.

    Using Hadoop was not easy for end users, especially for the ones who were not familiar with

    MapReduce framework. End users had to write map/reduce programs for simple tasks like getting raw

    counts or averages. Hive was created to make it possible for analysts with strong SQL skills (but meager

    Java programming skills) to run queries on the huge volumes of data to extract patterns and meaningful

    information. It provides an SQL-like language called HiveQL while maintaining full support for

    map/reduce. In short, a Hive query is converted to MapReduce tasks.

    The main building blocks of Hive are

    1. Metastore stores the system catalog and metadata about tables, columns, partitions, etc.

    2. Driver manages the lifecycle of a HiveQL statement as it moves through Hive

    3. Query Compiler compiles HiveQL into a directed acyclic graph for MapReduce tasks

    4. Execution Engine executes the tasks produced by the compiler in proper dependency order

    5. HiveServer provides a Thrift interface and a JDBC / ODBC server

    HBase:

    HBase is the Hadoop application to use when you require real-time read/write random-access to

    very large datasets.

    It is a distributed column-oriented database built on top of HDFS.

    HBase is not relational and does not support SQL, but given the proper problem space, it is able to do

    what an RDBMS cannot: host very large, sparsely populated tables on clusters made from

    commodity hardware.

  • 9

    Mahout:

    Mahout is an open source machine learning library from Apache.

    Its highly scalable.

    Mahout aims to be the machine learning tool of choice when the collection of data to be

    processed is very large, perhaps far too large for a single machine. At the moment, it primarily

    implements recommender engines (collaborative filtering), clustering, and classification.

    Sqoop:

    Loading bulk data into Hadoop from production systems or accessing it from map-reduce applications

    running on large clusters can be a challenging task. Transferring data using scripts is inefficient and time-

    consuming.

    How do we efficiently move data from an external storage into HDFS or Hive or HBase? Meet Apache

    Sqoop. Sqoop allows easy import and export of data from structured data stores such as relational

    databases, enterprise data warehouses, and NoSQL systems. The dataset being transferred is sliced up

    into different partitions and a map-only job is launched with individual mappers responsible for

    transferring a slice of this dataset.

  • 10

    ZooKeeper:

    ZooKeeper is a distributed, open-source coordination service for distributed applications.

    It exposes a simple set of primitives that distributed applications can build upon to implement

    higher level services for synchronization, configuration maintenance, and groups and naming.

  • 11

    An introduction to the Hadoop Distributed File System

    HDFS is an Apache Software Foundation project and a subproject of the Apache Hadoop project Hadoop is ideal for storing large amounts of data, like terabytes and petabytes, and uses HDFS as its storage system. HDFS lets you connect nodes (commodity personal computers) contained within clusters over which data files are distributed. You can then access and store the data files as one seamless file system. Access to data files is handled in a streaming manner, meaning that applications or commands are executed directly using the MapReduce processing model.

    HDFS is fault tolerant and provides high-throughput access to large data sets. This article explores the primary features of HDFS and provides a high-level view of the HDFS architecture.

    Overview of HDFS

    HDFS has many similarities with other distributed file systems, but is different in several

    respects. One noticeable difference is HDFS's write-once-read-many model that relaxes concurrency control requirements, simplifies data coherency, and enables high-throughput access.

    Another unique attribute of HDFS is the viewpoint that it is usually better to locate processing logic near the data rather than moving the data to the application space.

    HDFS rigorously restricts data writing to one writer at a time. Bytes are always appended to the end of a stream, and byte streams are guaranteed to be stored in the order written.

    HDFS has many goals. Here are some of the most notable:

    Fault tolerance by detecting faults and applying quick, automatic recovery Data access via MapReduce streaming Simple and robust coherency model Processing logic close to the data, rather than the data close to the processing logic Portability across heterogeneous commodity hardware and operating systems Scalability to reliably store and process large amounts of data Economy by distributing data and processing across clusters of commodity personal computers Efficiency by distributing data and logic to process it in parallel on nodes where data is located Reliability by automatically maintaining multiple copies of data and automatically redeploying

    processing logic in the event of failures

    HDFS provides interfaces for applications to move them closer to where the data is located, as described in the following section.

  • 12

    Application interfaces into HDFS

    You can access HDFS in many different ways. HDFS provides a native Java application programming interface (API) and a native C-language wrapper for the Java API. In addition, you can use a web browser to browse HDFS files.

    The applications described in Table 1 are also available to interface with HDFS.

    Table 1. Applications that can interface with HDFS

    Application Description FileSystem (FS) shell

    A command-line interface similar to common Linux and UNIX shells (bash, csh, etc.) that allows interaction with HDFS data.

    DFSAdmin A command set that you can use to administer an HDFS cluster.

    fsck A subcommand of the Hadoop command/application. You can use the fsck command to check for inconsistencies with files, such as missing blocks, but you cannot use the fsck command to correct these inconsistencies.

    Name nodes and data nodes

    These have built-in web servers that let administrators check the current status of a cluster.

    HDFS has an extraordinary feature set with high expectations thanks to its simple, yet powerful, architecture.

    HDFS architecture

    HDFS is comprised of interconnected clusters of nodes where files and directories reside. An HDFS cluster consists of a single node, known as a NameNode, that manages the file system namespace and regulates client access to files. In addition, data nodes (DataNodes) store data as blocks within files.

    Name nodes and data nodes

    Within HDFS, a given name node manages file system namespace operations like opening, closing, and renaming files and directories. A name node also maps data blocks to data nodes, which handle read and write requests from HDFS clients. Data nodes also create, delete, and replicate data blocks according to instructions from the governing name node.

  • 13

    Figure 1 illustrates the high-level architecture of HDFS.

    Figure 1. The HDFS architecture

    As Figure 1 illustrates, each cluster contains one name node. This design facilitates a simplified model for managing each namespace and arbitrating data distribution.

    Relationships between name nodes and data nodes

    Name nodes and data nodes are software components designed to run in a decoupled manner on commodity machines across heterogeneous operating systems. HDFS is built using the Java programming language; therefore, any machine that supports the Java programming language can run HDFS. A typical installation cluster has a dedicated machine that runs a name node and possibly one data node. Each of the other machines in the cluster runs one data node.

    Communications protocols

    All HDFS communication protocols build on the TCP/IP protocol. HDFS clients connect to a Transmission Control Protocol (TCP) port opened on the name node, and then communicate with the name node using a proprietary Remote Procedure Call (RPC)-based protocol. Data nodes talk to the name node using a proprietary block-based protocol.

    Data nodes continuously loop, asking the name node for instructions. A name node can't connect directly to a data node; it simply returns values from functions invoked by a data node. Each data node maintains an open server socket so that client code or other data nodes can read or write data. The host or port for this server socket is known by the name node, which provides the information to interested

  • 14

    clients or other data nodes. See the Communications protocols sidebar for more about communication between data nodes, name nodes, and clients.

    The name node maintains and administers changes to the file system namespace.

    File system namespace

    HDFS supports a traditional hierarchical file organization in which a user or an application can create directories and store files inside them. The file system namespace hierarchy is similar to most other existing file systems; you can create, rename, relocate, and remove files.

    HDFS also supports third-party file systems such as CloudStore and Amazon Simple Storage Service (S3)

    Data replication

    HDFS replicates file blocks for fault tolerance. An application can specify the number of replicas of a file at the time it is created, and this number can be changed any time after that. The name node makes all decisions concerning block replication.

    Rack awareness

    Typically, large HDFS clusters are arranged across multiple installations (racks). Network traffic between different nodes within the same installation is more efficient than network traffic across installations. A name node tries to place replicas of a block on multiple installations for improved fault tolerance. However, HDFS allows administrators to decide on which installation a node belongs. Therefore, each node knows its rack ID, making it rack aware.

    HDFS uses an intelligent replica placement model for reliability and performance. Optimizing replica placement makes HDFS unique from most other distributed file systems, and is facilitated by a rack-aware replica placement policy that uses network bandwidth efficiently.

    Large HDFS environments typically operate across multiple installations of computers. Communication between two data nodes in different installations is typically slower than data nodes within the same installation. Therefore, the name node attempts to optimize communications between data nodes. The name node identifies the location of data nodes by their rack IDs.

    Data organization

    One of the main goals of HDFS is to support large files. The size of a typical HDFS block is 64MB. Therefore, each HDFS file consists of one or more 64MB blocks. HDFS tries to place each block on separate data nodes.

    File creation process

  • 15

    Manipulating files on HDFS is similar to the processes used with other file systems. However, because HDFS is a multi-machine system that appears as a single disk, all code that manipulates files on HDFS uses a subclass of the org.apache.hadoop.fs.FileSystem object

    The code shown in Listing 1 illustrates a typical file creation process on HDFS.

    Listing 1. Typical file creation process on HDFS byte[] fileData = retrieveFileDataFromSomewhere(); String filePath = retrieveFilePathStringFromSomewhere(); Configuration config = new Configuration(); // assumes to automatically load // hadoop-default.xml and hadoop-site.xml org.apache.hadoop.fs.FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(config); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(filePath); org.apache.hadoop.fs.FSDataOutputStream outputStream = hdfs.create(path); outputStream.write(fileData, 0, fileData.length);

    Staging to commit

    When a client creates a file in HDFS, it first caches the data into a temporary local file. It then redirects subsequent writes to the temporary file. When the temporary file accumulates enough data to fill an HDFS block, the client reports this to the name node, which converts the file to a permanent data node. The client then closes the temporary file and flushes any remaining data to the newly created data node. The name node then commits the data node to disk.

    Replication pipelining

    When a client accumulates a full block of user data, it retrieves a list of data nodes that contains a replica of that block from the name node. The client then flushes the full data block to the first data node specified in the replica list. As the node receives chunks of data, it writes them to disk and transfers copies to the next data node in the list. The next data node does the same. This pipelining process is repeated until the replication factor is satisfied.

  • 16

    Data storage reliability

    One important objective of HDFS is to store data reliably, even when failures occur within name nodes, data nodes, or network partitions.

    Detection is the first step HDFS takes to overcome failures. HDFS uses heartbeat messages to detect connectivity between name and data nodes.

    HDFS heartbeats

    Several things can cause loss of connectivity between name and data nodes. Therefore, each data node sends periodic heartbeat messages to its name node, so the latter can detect loss of connectivity if it stops receiving them. The name node marks as dead data nodes not responding to heartbeats and refrains from sending further requests to them. Data stored on a dead node is no longer available to an HDFS client from that node, which is effectively removed from the system. If the death of a node causes the replication factor of data blocks to drop below their minimum value, the name node initiates

    additional replication to bring the replication factor back to a normalized state.

    Figure 2 illustrates the HDFS process of sending heartbeat messages.

    Figure 2. The HDFS heartbeat process

    Data block rebalancing

    HDFS data blocks might not always be placed uniformly across data nodes, meaning that the used space for one or more data nodes can be underutilized. Therefore, HDFS supports rebalancing data blocks using various models. One model might move data blocks from one data node to another automatically

  • 17

    if the free space on a data node falls too low. Another model might dynamically create additional replicas and rebalance other data blocks in a cluster if a sudden increase in demand for a given file occurs. HDFS also provides the hadoop balance command for manual rebalancing tasks.

    One common reason to rebalance is the addition of new data nodes to a cluster. When placing new blocks, name nodes consider various parameters before choosing the data nodes to receive them. Some of the considerations are:

    Block-replica writing policies Prevention of data loss due to installation or rack failure Reduction of cross-installation network I/O Uniform data spread across data nodes in a cluster

    The cluster-rebalancing feature of HDFS is just one mechanism it uses to sustain the integrity of its data. Other mechanisms are discussed next.

    Data integrity

    HDFS goes to great lengths to ensure the integrity of data across clusters. It uses checksum validation on the contents of HDFS files by storing computed checksums in separate, hidden files in the same namespace as the actual data. When a client retrieves file data, it can verify that the data received matches the checksum stored in the associated file.

    The HDFS namespace is stored using a transaction log kept by each name node. The file system namespace, along with file block mappings and file system properties, is stored in a file called FsImage. When a name node is initialized, it reads the FsImage file along with other files, and applies the transactions and state information found in these files.

    Synchronous metadata updating

    A name node uses a log file known as the EditLog to persistently record every transaction that occurs to HDFS file system metadata. If the EditLog or FsImage files become corrupted, the HDFS instance to which they belong ceases to function. Therefore, a name node supports multiple copies of the FsImage and EditLog files. With multiple copies of these files in place, any change to either file propagates synchronously to all of the copies. When a name node restarts, it uses the latest consistent version of FsImage and EditLog to initialize itself.

    HDFS permissions for users, files, and directories

    HDFS implements a permissions model for files and directories that has a lot in common with the Portable Operating System Interface (POSIX) model; for example, every file and directory is associated with an owner and a group. The HDFS permissions model supports read (r), write (w), and execute (x). Because there is no concept of file execution within HDFS, the x permission takes on a different meaning. Simply put, the x attribute indicates permission for accessing a child directory of a given parent directory. The owner of a file or directory is the identity of the client process that created it. The group is the group of the parent directory.

  • 18

  • 19

    Introduction to MapReduce

    Introduction

    MapReduce is a programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks.

    MapReduce programs are written in a particular style influenced by functional programming constructs, specifically idioms for processing lists of data.

    This module explains the nature of this programming model and how it can be used to write programs which run in the Hadoop environment.

    Goals for this Module:

    Understand functional programming as it applies to MapReduce Understand the MapReduce program flow Understand how to write programs for Hadoop MapReduce Learn about additional features of Hadoop designed to aid software development.

    MapReduce Basics

    Functional Programming Concepts

    MapReduce programs are designed to compute large volumes of data in a parallel fashion. This requires dividing the workload across a large number of machines.

    This model would not scale to large clusters (hundreds or thousands of nodes) if the components were allowed to share data arbitrarily.

    The communication overhead required to keep the data on the nodes synchronized at all times would prevent the system from performing reliably or efficiently at large scale.

    Instead, all data elements in MapReduce are immutable, meaning that they cannot be updated.

    If in a mapping task you change an input (key, value) pair, it does not get reflected back in the input files; communication occurs only by generating new output (key, value) pairs which are then forwarded by the Hadoop system into the next phase of execution.

    List Processing

    Conceptually, MapReduce programs transform lists of input data elements into lists of output data elements.

    A MapReduce program will do this twice, using two different list processing idioms: map, and reduce. These terms are taken from several list processing languages such as LISP, Scheme, or ML.

    Mapping Lists

    The first phase of a MapReduce program is called mapping. A list of data elements are provided, one at a time, to a function called the Mapper, which transforms each element individually to an output data element.

  • 20

    As an example of the utility of map: Suppose you had a function toUpper(str) which returns an uppercase version of its input string. You could use this function with map to turn a list of strings into a list of uppercase strings.

    Note that we are not modifying the input string here: we are returning a new string that will form part of a new output list.

    Reducing Lists

    Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.

    Reducing is often used to produce "summary" data, turning a large volume of data into a smaller summary of itself. For example, "+" can be used as a reducing function, to return the sum of a list of input values.

    Putting Them Together in MapReduce:

    The Hadoop MapReduce framework takes these concepts and uses them to process large volumes of information. A MapReduce program has two components: one that implements the mapper, and another that implements the reducer. The Mapper and Reducer idioms described above are extended slightly to work in this environment, but the basic principles are the same.

    Keys and values: In MapReduce, no value stands on its own. Every value has a key associated with it. Keys identify related values. For example, a log of time-coded speedometer readings from multiple cars could be keyed by license-plate number; it would look like:

    AAA-123 65mph, 12:00pm ZZZ-789 50mph, 12:02pm AAA-123 40mph, 12:05pm CCC-456 25mph, 12:15pm ...

    The mapping and reducing functions receive not just values, but (key, value) pairs. The output of each of these functions is the same: both a key and a value must be emitted to the next list in the data flow.

    MapReduce is also less strict than other languages about how the Mapper and Reducer work.

    In more formal functional mapping and reducing settings, a mapper must produce exactly one output element for each input element, and a reducer must produce exactly one output element for each input list.

    In MapReduce, an arbitrary number of values can be output from each phase; a mapper may map one input into zero, one, or one hundred outputs.

    A reducer may compute over an input list and emit one or a dozen different outputs.

    Keys divide the reduce space: A reducing function turns a large list of values into one (or a few) output values. In MapReduce, all of the output values are not usually reduced together. All of the values with the same key are presented to a single reducer together. This is performed independently of any reduce operations occurring on other lists of values, with different keys attached.

  • 21

    An Example Application: Word Count

    A simple MapReduce program can be written to determine how many times different words appear in a set of files. For example, if we had the files:

    foo.txt: Sweet, this is the foo file

    bar.txt: This is the bar file

    We would expect the output to be:

    sweet 1 this 2 is 2 the 2 foo 1 bar 1 file 2

    Naturally, we can write a program in MapReduce to compute this output. The high-level structure would look like this:

    mapper (filename, file-contents): for each word in file-contents: emit (word, 1) reducer (word, values): sum = 0 for each value in values: sum = sum + value emit (word, sum)

    Listing 1: High-Level MapReduce Word Count

    Several instances of the mapper function are created on the different machines in our cluster. Each instance receives a different input file (it is assumed that we have many such files). The mappers output (word, 1) pairs which are then forwarded to the reducers. Several instances of the reducer method are also instantiated on the different machines. Each reducer is responsible for processing the list of values associated with a different word. The list of values will be a list of 1's; the reducer sums up those ones into a final count associated with a single word. The reducer then emits the final (word, count) output which is written to an output file.

  • 22

    We can write a very similar program to this in Hadoop MapReduce; it is included in the Hadoop distribution in src/examples/org/apache/hadoop/examples/WordCount.java. It is partially reproduced below:

    public static class MapClass extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }

    Listing 2: Hadoop MapReduce Word Count Source

    There are some minor differences between this actual Java implementation and the pseudo-code shown above.

    First, Java has no native emit keyword; the OutputCollector object you are given as an input will receive values to emit to the next stage of execution.

  • 23

    Second, the default input format used by Hadoop presents each line of an input file as a separate input to the mapper function, not the entire file at a time. It also uses a StringTokenizer object to break up the line into words. This does not perform any normalization of the input, so "cat", "Cat" and "cat," are all regarded as different strings.

    Note that the class-variable word is reused each time the mapper outputs another (word, 1) pairing; this saves time by not allocating a new variable for each output.

    The output.collect() method will copy the values it receives as input, so you are free to overwrite the variables you use.

    The Driver Method

    There is one final component of a Hadoop MapReduce program, called the Driver. The driver initializes the job and instructs the Hadoop platform to execute your code on a set of input files, and controls where the output files are placed. A cleaned-up version of the driver from the example Java implementation that comes with Hadoop is presented below:

    public void run(String inputPath, String outputPath) throws Exception { JobConf conf = new JobConf (WordCount.class); conf.setJobName("wordcount"); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setReducerClass(Reduce.class); FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); }

    Listing 3: Hadoop MapReduce Word Count Driver

    This method sets up a job to execute the word count program across all the files in a given input directory (the inputPath argument).

    The output from the reducers is written into files in the directory identified by outputPath.

    The configuration information to run the job is captured in the JobConf object.

    The mapping and reducing functions are identified by the setMapperClass() and setReducerClass() methods.

    The data types emitted by the reducer are identified by setOutputKeyClass() and setOutputValueClass(). By default, it is assumed that these are the output types of the mapper

  • 24

    as well. If this is not the case, the methods setMapOutputKeyClass() and setMapOutputValueClass() methods of the JobConf class will override these.

    The input types fed to the mapper are controlled by the InputFormat used.

    The default input format, "TextInputFormat," will load data in as (LongWritable, Text) pairs.

    The long value is the byte offset of the line in the file. The Text object holds the string contents of the line of the file.

    The call to JobClient.runJob(conf) will submit the job to MapReduce. This call will block until the job completes. If the job fails, it will throw an IOException. JobClient also provides a non-blocking version called submitJob().

    MapReduce Data Flow

    Now that we have seen the components that make up a basic MapReduce job, we can see how everything works together at a higher level:

    MapReduce inputs typically come from input files loaded onto our processing cluster in HDFS. These files are evenly distributed across all our nodes.

    Running a MapReduce program involves running mapping tasks on many or all of the nodes in our cluster. Each of these mapping tasks is equivalent: no mappers have particular "identities" associated with them. Therefore, any mapper can process any input file.

    Each mapper loads the set of files local to that machine and processes them.

    When the mapping phase has completed, the intermediate (key, value) pairs must be exchanged between machines to send all values with the same key to a single reducer.

    The reduce tasks are spread across the same nodes in the cluster as the mappers. This is the only communication step in MapReduce.

    Individual map tasks do not exchange information with one another, nor are they aware of one another's existence.

    Similarly, different reduce tasks do not communicate with one another.

    The user never explicitly marshals information from one machine to another; all data transfer is handled by the Hadoop MapReduce platform itself, guided implicitly by the different keys associated with values.

    This is a fundamental element of Hadoop MapReduce's reliability. If nodes in the cluster fail, tasks must be able to be restarted. If they have been performing side-effects, e.g., communicating with the outside world, then the shared state must be restored in a restarted task. By eliminating communication and side-effects, restarts can be handled more gracefully.

    Input files: This is where the data for a MapReduce task is initially stored. While this does not need to be the case, the input files typically reside in HDFS. The format of these files is arbitrary; while line-based log files can be used, we could also use a binary format, multi-line input records, or something else entirely. It is typical for these input files to be very large -- tens of gigabytes or more.

    InputFormat: How these input files are split up and read is defined by the InputFormat. An InputFormat is a class that provides the following functionality:

    Selects the files or other objects that should be used for input Defines the InputSplits that break a file into tasks Provides a factory for RecordReader objects that read the file

  • 25

    InputFormat: Description: Key: Value:

    TextInputFormat Default format; reads lines of text files

    The byte offset of the line

    The line contents

    KeyValueInputFormat Parses lines into key, val pairs Everything up to the first tab character

    The remainder of the line

    SequenceFileInputFormat A Hadoop-specific high-performance binary format

    user-defined user-defined

    Table 1: InputFormats provided by MapReduce

    The default InputFormat is the TextInputFormat. This treats each line of each input file as a separate record, and performs no parsing. This is useful for unformatted data or line-based records like log files.

    A more interesting input format is the KeyValueInputFormat. This format also treats each line of input as a separate record. While the TextInputFormat treats the entire line as the value, the KeyValueInputFormat breaks the line itself into the key and value by searching for a tab character. This is particularly useful for reading the output of one MapReduce job as the input to another, as the default OutputFormat (described in more detail below) formats its results in this manner.

    Finally, the SequenceFileInputFormat reads special binary files that are specific to Hadoop. These files include many features designed to allow data to be rapidly read into Hadoop mappers. Sequence files are block-compressed and provide direct serialization and deserialization of several arbitrary data types (not just text). Sequence files can be generated as the output of other MapReduce tasks and are an efficient intermediate representation for data that is passing from one MapReduce job to another.

    InputSplits:

    An InputSplit describes a unit of work that comprises a single map task in a MapReduce program.

    A MapReduce program applied to a data set, collectively referred to as a Job, is made up of several (possibly several hundred) tasks.

    Map tasks may involve reading a whole file; they often involve reading only part of a file. By default, the FileInputFormat and its descendants break a file up into 64 MB chunks (the same size as blocks in HDFS).

    RecordReader:

    The InputSplit has defined a slice of work, but does not describe how to access it.

    The RecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper.

    The RecordReader instance is defined by the InputFormat.

    The default InputFormat, TextInputFormat, provides a LineRecordReader, which treats each line of the input file as a new value.

  • 26

    The key associated with each line is its byte offset in the file.

    The RecordReader is invoke repeatedly on the input until the entire InputSplit has been consumed.

    Each invocation of the RecordReader leads to another call to the map() method of the Mapper.

    Mapper:

    The Mapper performs the interesting user-defined work of the first phase of the MapReduce program.

    Given a key and a value, the map() method emits (key, value) pair(s) which are forwarded to the Reducers.

    Partition & Shuffle:

    The process of moving map outputs to the reducers is known as shuffling.

    A different subset of the intermediate key space is assigned to each reduce node; these subsets (known as "partitions") are the inputs to the reduce tasks.

    Each map task may emit (key, value) pairs to any partition; all values for the same key are always reduced together regardless of which mapper is its origin.

    Therefore, the map nodes must all agree on where to send the different pieces of the intermediate data. The Partitioner class determines which partition a given (key, value) pair will go to. The default partitioner computes a hash value for the key and assigns the partition based on this result.

    Sort: Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer.

    Reduce:

    A Reducer instance is created for each reduce task.

    This is an instance of user-provided code that performs the second important phase of job-specific work.

    For each key in the partition assigned to a Reducer, the Reducer's reduce() method is called once.

    This receives a key as well as an iterator over all the values associated with the key.

    The values associated with a key are returned by the iterator in an undefined order.

    The Reducer also receives as parameters OutputCollector and Reporter objects; they are used in the same manner as in the map() method.

    OutputFormat:

    The (key, value) pairs provided to this OutputCollector are then written to output files. The way they are written is governed by the OutputFormat.

    The OutputFormat functions much like the InputFormat class described earlier.

    The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS; they all inherit from a common FileOutputFormat.

  • 27

    Each Reducer writes a separate file in a common output directory.

    These files will typically be named part-nnnnn, where nnnnn is the partition id associated with the reduce task.

    The output directory is set by the FileOutputFormat.setOutputPath() method.

    You can control which particular OutputFormat is used by calling the setOutputFormat() method of the JobConf object that defines your MapReduce job.

    A table of provided OutputFormats is given below.

    OutputFormat:

    Description

    TextOutputFormat Default; writes lines in "key \t value" form

    SequenceFileOutputFormat Writes binary files suitable for reading into subsequent MapReduce jobs

    NullOutputFormat Disregards its inputs

    Table 2: OutputFormats provided by Hadoop

    RecordWriter: Much like how the InputFormat actually reads individual records through the RecordReader implementation, the OutputFormat class is a factory for RecordWriter objects; these are used to write the individual records to the files as directed by the OutputFormat.

    The output files written by the Reducers are then left in HDFS for your use, either by another MapReduce job, a separate program, for for human inspection.

    Hadoop Streaming

    Whereas Pipes is an API that provides close coupling between C++ application code and Hadoop, Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations.

    Hadoop Streaming allows you to use arbitrary programs for the Mapper and Reducer phases of a MapReduce job. Both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

    Input and output are always represented textually in Streaming. The input (key, value) pairs are written to stdin for a Mapper or Reducer, with a 'tab' character separating the key from the value. The Streaming programs should split the input on the first tab character on the line to recover the key and the value. Streaming programs write their output to stdout in the same format: key \t value \n.

    The inputs to the reducer are sorted so that while each line contains only a single (key, value) pair, all the values for the same key are adjacent to one another.

    Provided it can handle its input in the text format described above, any Linux program or tool can be used as the mapper or reducer in Streaming. You can also write your own scripts in bash, python, perl,

  • 28

    or another language of your choice, provided that the necessary interpreter is present on all nodes in your cluster.

    Running a Streaming Job: To run a job with Hadoop Streaming, use the following command:

    $ bin/hadoop jar contrib/streaming/hadoop-version-streaming.jar

    The command as shown, with no arguments, will print some usage information. An example of how to run real commands is given below:

    $ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \ myMapProgram -reducer myReduceProgram -input /some/dfs/path \ -output /some/other/dfs/path

    This assumes that myMapProgram and myReduceProgram are present on all nodes in the system ahead of time. If this is not the case, but they are present on the node launching the job, then they can be "shipped" to the other nodes with the -file option:

    $ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \ myMapProgram -reducer myReduceProgram -file \ myMapProgram -file myReduceProgram -input some/dfs/path \ -output some/other/dfs/path

    Any other support files necessary to run your program can be shipped in this manner as well.

  • 29

    MapReduce API

    Package org.apache.hadoop.mapreduce

    Interface Summary

    Counter A named counter that tracks the progress of a map/reduce job.

    CounterGroup A group of Counters that logically belong together.

    JobContext A read-only view of the job that is provided to the tasks while they are running.

    MapContext The context that is given to the Mapper.

    ReduceContext The context passed to the Reducer.

    TaskAttemptContext The context for task attempts.

    TaskInputOutputContext A context object that allows input and output from the task.

    Class Summary

    Cluster Provides a way to access information about the map/reduce cluster.

    ClusterMetrics Status information on the current state of the Map-Reduce cluster.

    Counters Counters holds per job/task counters, defined either by the Map-Reduce framework or applications.

    ID A general identifier, which internally stores the id as an integer.

    InputFormat InputFormat describes the input-specification for a Map-Reduce job.

    InputSplit InputSplit represents the data to be processed by an individual Mapper.

    Job The job submitter's view of the Job.

    JobID JobID represents the immutable and unique identifier for the job.

    JobStatus Describes the current status of a job.

    Mapper Maps input key/value pairs to a set of intermediate key/value pairs.

  • 30

    MarkableIterator MarkableIterator is a wrapper iterator class that implements the MarkableIteratorInterface.

    OutputCommitter OutputCommitter describes the commit of task output for a Map-Reduce job.

    OutputFormat OutputFormat describes the output-specification for a Map-Reduce job.

    Partitioner Partitions the key space.

    QueueAclsInfo Class to encapsulate Queue ACLs for a particular user.

    QueueInfo Class that contains the information regarding the Job Queues which are maintained by the Hadoop Map/Reduce framework.

    RecordReader The record reader breaks the data into key/value pairs for input to the Mapper.

    RecordWriter RecordWriter writes the output pairs to an output file.

    Reducer Reduces a set of intermediate values which share a key to a smaller set of values.

    TaskAttemptID TaskAttemptID represents the immutable and unique identifier for a task attempt.

    TaskCompletionEvent This is used to track task completion events on job tracker.

    TaskID TaskID represents the immutable and unique identifier for a Map or Reduce Task.

    TaskTrackerInfo Information about TaskTracker.

    Enum Summary

    JobCounter

    JobPriority Used to describe the priority of the running job.

    QueueState Enum representing queue state

    TaskCompletionEvent.Status

    TaskCounter

    TaskType Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.

  • 31

    Mapper

    Constructor Detail

    Mapper

    public Mapper ()

    Method Detail

    setup

    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException

    Called once at the beginning of the task. Throws: IOException InterruptedException

    map

    protected void map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException

    Called once for each key/value pair in the input split. Most applications should override this, but the default is the identity function. Throws: IOException InterruptedException

    cleanup

    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException

    Called once at the end of the task. Throws: IOException InterruptedException

  • 32

    run

    public void run(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException

    Expert users can override this method for more complete control over the execution of the Mapper. Parameters: context - Throws: IOException InterruptedException

    RecordReader

    Constructor Detail

    RecordReader

    public RecordReader()

    Method Detail

    initialize

    public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException

    Called once at initialization. Parameters: split - the split that defines the range of records to read context - the information about the task Throws: IOException InterruptedException

    nextKeyValue

    public abstract boolean nextKeyValue() throws IOException,InterruptedException

    Read the next key, value pair. Returns: true if a key/value pair was read Throws: IOException InterruptedException

  • 33

    getCurrentKey

    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException

    Get the current key Returns: the current key or null if there is no current key Throws: IOException InterruptedException

    getCurrentValue

    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException

    Get the current value. Returns: the object that was read Throws: IOException InterruptedException

    getProgress

    public abstract float getProgress() throws IOException, InterruptedException

    The current progress of the record reader through its data. Returns: a number between 0.0 and 1.0 that is the fraction of the data read Throws: IOException InterruptedException

    close

    public abstract void close() throws IOException

    Close the record reader. Specified by: close in interface Closeable Throws: IOException

  • 34

    Reducer

    Constructor Detail

    Reducer

    public Reducer()

    Method Detail

    setup

    protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException

    Called once at the start of the task. Throws: IOException InterruptedException

    reduce

    protected void reduce(KEYIN key, Iterable values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException

    This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an identity function. Throws: IOException InterruptedException

    cleanup

    protected void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException,InterruptedException

    Called once at the end of the task. Throws: IOException InterruptedException

  • 35

    run

    public void run(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException

    Advanced application writers can use the run(org.apache.hadoop.mapreduce.Reducer.Context) method to control how the reduce task works. Throws: IOException InterruptedException

    Prior to Hadoop 0.20.x, a Map class had to extend a MapReduceBase and implement a Mapper as such:

    public static class Map extends MapReduceBase implements Mapper { ...

    } and similarly, a map function had to use an OutputCollector and a Reporter object to emit (key, value) pairs and send progress updates to the main program. A typical map function looked like:

    public void map(K1, V1, OutputCollector o, Reporter r) throws IOException { ...

    output. Collect(key,value); }

    With the new Hadoop API, a mapper or reducer has to extend classes from the package org.apache.hadoop.mapreduce.* and there is no need to implement an interface anymore. Here is how a Map class is defined in the new API:

    public class MapClass extends Mapper { ... }

    and a map function uses Context objects to emit records and send progress updates. A typical map function is now defined as:

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ...

    context.write(key,value); }

    All of the changes for a Mapper above go the same way for a Reducer. Another major change has been done in the way a job is configured and controlled. Earlier, a map reduce job was configured through a JobConf object and the job control was done using an instance of JobClient. The main body of a driver class used to look like:

    JobConf conf = new JobConf(Driver.class); conf.setPropertyX(..); conf.setPropertyY(..); ... ...

  • 36

    JobClient.runJob(conf); In the new Hadoop API, the same functionality is achieved as follows:

    Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(Driver.class); job.setPropertyX(..); job.setPropertyY(..); job.waitForCompletion(true);

  • 37

    Advance Hadoop API

    Combiner: The primary goal of combiners is to optimize/minimize the number of key value pairs that

    will be shuffled accross the network between mappers and reducers and thus to save as

    most bandwidth as possible

    Eg. Take word count example on a text containing one million times the word the. Without combiner the mapper will send one million key/value pairs of the form . With combiners, it will potentially send much less key/value pairs of the form with N a number potentially much bigger than 1. Thats just the intuition (see the references at the end of the post for more details).

    Simply speaking a combiner can be considered as a mini reducer that will be applied potentially several times still during the map phase before to send the new (hopefully reduced) set of key/value pairs to the reducer(s). This is why a combiner must implement the Reducer interface (or extend the Reducer class as of hadoop 0.20).

    conf.setCombinerClass(Reduce.class);

    Indeed, suppose 5 key/value pairs emitted from the mapper for a given key k: , ,

    , , . Without combiner, when the reducer will receive the list

    , the mean output will be 20, but if a combiner were applied before on the

    two sets (, , ) and (, ) separately, then the reducer would have

    received the list and the output would have been different (17.5) which is an

    unexpected behavior.

  • 38

    Performance Measurement:

    Local Execution Mode using LocalJobRunner from Hadoop Hadoop's LocalJobRunner is to execute the same Map Reduce Physical plans locally. So we

    compile the logical plan into a map reduce physical plan and create the jobcontrol object

    corresponding to the mapred plan. We just need to write a separate launcher which will submit

    the job to the LocalJobRunner instead of submitting to an external Job Tracker.

    Pros

    Code Reuse

    No need to write and maintain

    Different operators

    Different logical to physical tranlators

    Different launchers

    The current framework does not have any progress reporting. With this approach we will have it at no extra cost.

    Cons

    Not sure how stable LocalJobRunner is.

  • 39

    Found some bugs in hadoop-15 on it which makes it practically useless for us right now.

    These have been fixed however in hadoop-16

    Not sure how this will affect Example generator

    1) Will the invocation of LocalJobRunner have some latency?

    Definitely it does. As measured in hadoop 15, it has about 5 sec startup latency. Whether this

    affects depends on how and where we are using LocalJobRunner. If we strictly use it only when

    the user asks for local execution mode it should not matter. Also if the size of the data is at

    least in 10s of MBs, the LocalJobRunner performs better than streaming tuples through the

    plan of local operators.

    The Configuration API

    Components in Hadoop are configured using Hadoops own configuration API.

    instance of the Configuration class (found in the org.apache.hadoop.conf package)

    represents a collection of configuration properties and their values.

    Each property is named by a String, and the type of a value may be one of several types,

    including Java primitives i.e. Boolean int, float etc.

    Configurations read their properties from resourcesXML files with a simple structure

    for defining name-value pairs

    Example. A simple configuration file, configuration-1.xml

    color

    yellow

    Color

  • 40

    size

    10

    Size

    weight

    heavy

    true

    Weight

    size-weight

    ${size},${weight}

    Size and weight

    Assuming this configuration file is in a file called configuration-1.xml, we can access its

    properties using a piece of code like this:

    Configuration conf = new Configuration();

    conf.addResource("configuration-1.xml");

    assertThat(conf.get("color"), is("yellow"));

    assertThat(conf.getInt("size", 0), is(10));

    assertThat(conf.get("breadth", "wide"), is("wide"));

    Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order

    from the classpath:

  • 41

    1. core-default.xml : Read-only defaults for hadoop.

    2. core-site.xml: Site-specific configuration for a given hadoop installation

    Partitioner

    A Partitioner is responsible to perform the partitioning.

    In Hadoop, the default partitioner is HashPartitioner.

    The number of partition is then equal to the number of reduce tasks for the job.

    Why is it important?

    First, it has a direct impact on the overall performance of your job: a poorly designed

    partitioning function will not evenly distribute the charge over the reducers, potentially losing

    all the interest of the map/reduce distributed infrastructure.

    Example

    As you can see, the tokens are correctly ordered by number of occurrences on each reducer

    (which is what hadoop guarantees by default) but this is not what you need! Youd rather

    expect something like:

  • 42

    where tokens are totally ordered over the reducers, from 1 to 30 occurrences on the first reducer and

    from 31 to 14620 on the second. This would happen as a result of a correct partitioning function: all the

    tokens having a number of occurrences inferior to N (here 30) are sent to reducer 1 and the others are

    sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, you get the

    expected total order on the number of occurrences.

    Conclusion

    Partitioning in map/reduce is a fairly simple concept but that is important to get correctly. Most of the

    time, the default partitioning based on an hash function can be sufficient. But as we illustrated in this

    Issue, youll need some time to modify the default behavior and to customize your own partitioning

    suited for your needs.

    HDFS Accessibility

    HDFS can be accessed from applications in many different ways. Natively, HDFS provides a FileSystem Java API for applications to use. A C language wrapper for this Java API is also available. In addition, an HTTP browser can also be used to browse the files of an HDFS instance. Work is in progress to expose HDFS through the WebDAV protocol.

    FS Shell

  • 43

    HDFS allows user data to be organized in the form of files and directories. It provides a commandline interface called FS shell that lets a user interact with the data in HDFS. The syntax of this command set is similar to other shells (e.g. bash, csh) that users are already familiar with. Here are some sample action/command pairs:

    Action Command

    Create a directory named /tmp bin/hadoop dfs -mkdir /tmp

    Remove a directory named /tmp bin/hadoop dfs -rmr /tmp

    View the contents of a file named /tmp/myfile.txt bin/hadoop dfs -cat /tmp/myfile.txt

    List The Directory name present in HDFS bin/hadoop dfs ls /tmp/

    To Copy Files into HDFS bin/hadoop dfs copyFromLocal

    To Copy Files from HDFS bin/hadoop dfs copyToLocal

    FS shell is targeted for applications that need a scripting language to interact with the stored data.

    DFSAdmin

    The DFSAdmin command set is used for administering an HDFS cluster. These are commands that are used only by an HDFS administrator. Here are some sample action/command pairs:

    Action Command

    Put the cluster in Safemode bin/hadoop dfsadmin -safemode enter

    Generate a list of DataNodes bin/hadoop dfsadmin -report

    Recommission or decommission DataNode(s) bin/hadoop dfsadmin -refreshNodes

    Browser Interface

  • 44

    A typical HDFS install configures a web server to expose the HDFS namespace through a configurable TCP port. This allows a user to navigate the HDFS namespace and view the contents of its files using a web browser.

    Using Hadoops DistributedCache

    While working with Map Reduce applications, there are times when we need to share files

    globally with all nodes on the cluster. This can be a shared library to be accessed by each task,

    Hadoops Map Reduce project provides this functionality through a distributed cache. This

    distributed cache can be configured with the job, and provides read only data to the application

    across all machines.

    This provides a service for copying files and archives to the task nodes in time for the

    tasks to use them when they run.

    To save network bandwidth, files are normally copied to any particular node once per

    job.

    Distributing files is pretty straight forward. To cache a file addToCache.txt on HDFS, one can

    setup the job as

    Job job = new Job(conf);

    job.addCacheFile(new URI("/user/local/hadoop/addToCache.txt"));

    Other URI schemes can also be specified.

    Now, in the Mapper/Reducer, one can access the file as:

    Path[] cacheFiles = context.getLocalCacheFiles();

    FileInputStream fileStream = new FileInputStream(cacheFiles[0].toString());

    HIVE Basics

    Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data

    summarization, query, and analysis

  • 45

    Features of Hive

    Hive supports indexing to provide acceleration

    Support for different storage types.

    Hive stores metadata in an RDBMS which reduces significant time to perform the

    semantic checks during the query execution.

    Hive can operate on compressed data stored into Hadoop ecosystem

    Built-in user defined functions (UDFs) to manipulate dates, strings, and other data-

    mining tools. If none serves our need, we can create our own UDFs

    Hive supports SQL like queries (Hive QL) which is implicitly converted into map-reduce

    jobs

    HiveQL

    While based on SQL, HiveQL does not strictly follow the full SQL-92 standard. HiveQL offers

    extensions not in SQL

    **Detail will be provided Later

    PIG Basics

    Pig is a high-level platform for creating MapReduce programs used with Hadoop. The language

    for this platform is called Pig Latin

    Apache Pig is a platform for analyzing large data sets that consists of a high-level language for

    expressing data analysis programs, coupled with infrastructure for evaluating these programs.

    The salient property of Pig programs is that their structure is amenable to substantial

    parallelization, which in turns enables them to handle very large data sets.

    At the present time, Pig's infrastructure layer consists of a compiler that produces sequences of

    Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the

    Hadoop subproject). Pig's language layer currently consists of a textual language called Pig

    Latin, which has the following key properties:

  • 46

    Ease of programming It is trivial to achieve parallel execution of simple, "embarrassingly

    parallel" data analysis tasks. Complex tasks comprised of multiple interrelated data

    transformations are explicitly encoded as data flow sequences, making them easy to

    write, understand, and maintain.

    Optimization opportunities The way in which tasks are encoded permits the system to

    optimize their execution automatically, allowing the user to focus on semantics rather

    than efficiency.

    Extensibility Users can create their own functions to do special-purpose processing.

    Practical Development

    Counters

    A named counter that tracks the progress of a map/reduce job.

    Counters represent global counters, defined either by the Map-Reduce framework or

    applications. Each Counter is named by an Enum and has a long for the value.

    Counters are a useful channel for gathering statistics about the job. In addition to counter

    values being much easier to retrieve than log output for large

    distributed jobs, you get a record of the number of times that condition occurred, which

    is more work to obtain from a set of logfiles

    Types of Counter

    Built-in Counters Hadoop maintains some built-in counters for every job, which report various metrics

    for your job.

    Eg. MapReduce Task Counters , Filesystem Counters

    Task Counters Task counters gather information about tasks over the course of their execution, and

  • 47

    the results are aggregated over all the tasks in a job. Task counters are maintained by

    each task attempt, and periodically sent to the Task tracker and then to the jobtracker,

    so they can be globally aggregated.

    Eg. Map input records, Map skipped records

    Job counters

    Job counters are maintained by the jobtracker. They measure job-level statistics, not

    values that change while a task is running. For example, TOTAL_LAUNCHED_MAPS

    counts the number of map tasks that were launched over the course of a job (including

    ones that failed).

    Eg. Launched map tasks, Launched reduce tasks

    User-Defined Java Counters

    MapReduce allows user code to define a set of counters, which are then incremented as

    desired in the mapper or reducer. Counters are defined by a Java enum, which serves to

    group related counters.

    Determining the Optimal number of Reducer

    The optimal number of reducers is related to the total number of available reducer slots in

    your cluster. The total number of slots is found by multiplying the number of nodes in the

    cluster and the number of slots per node (which is determined by the value of

    the mapred.tasktracker.reduce.tasks.maximum property. By default it is 2)

    ChainMapper

    The ChainMapper class allows to use multiple Mapper classes within a single Map task.

    The Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes

    the input of the second, and so on until the last Mapper, the output of the last Mapper will be

    written to the task's output.

  • 48

    The key functionality of this feature is that the Mappers in the chain do not need to be aware

    that they are executed in a chain. This enables having reusable specialized Mappers that can be

    combined to perform composite operations within a single task.

  • 49

    More Advanced Map-Reduce Programming

    The Writable Interface

    Any key or value type in the Hadoop Map-Reduce framework implements this interface.

    The Writable interface defines two methods: 1 . writing its state to a DataOutput binary stream 2 . reading its state from a DataInput binary stream.

    package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }

    Lets look at a particular Writable to see what we can do with it. We will use IntWritable, a wrapper for a Java int. We can create one and set its value using the set() method: IntWritable writable = new IntWritable(); writable.set(163); Equivalently, we can use the constructor that takes the integer value:

    IntWritable writable = new IntWritable(163);

    Hadoop comes with a large selection of Writable classes in the org.apache.hadoop.io package.

    They form the class hierarchy shown in figure

  • 50

    Custom Writable and Writable Comparable Implementing a Custom Writable

    Hadoop comes with a useful set of Writable implementations that serve most purposes; however, on occasion, you may need to write your own custom implementation. With a custom Writable, you have full control over the binary representation and the sort order. Because Writables are at the heart of the MapReduce data path, tuning the binary representation can have a significant effect on performance. The stock Writable implementations that come with Hadoop are well-tuned, but for more elaborate structures, it is often better to create a new Writable type, rather than compose the stock types. To demonstrate how to create a custom Writable, we shall write an implementation that represents a pair of strings, called TextPair. Example . A Writable implementation that stores a pair of Text objects import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override

  • 51

    public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }

    The first part of the implementation is straightforward: there are two Text instance variables, first and second, and associated constructors, getters, and setters. All Writable implementations must have a default constructor so that the MapReduce framework can instantiate them, then populate their fields by calling readFields().

    TextPairs write() method serializes each Text object in turn to the output stream, by delegating

    to the Text objects themselves. Similarly, readFields() deserializes the bytes from the input stream by delegating to each Text object. The DataOutput and DataInput interfaces have a rich set of methods for serializing and deserializing Javaprimitives, so, in general, you have complete control over the wire format of your Writable object.

  • 52

    TextPair is an implementation of WritableComparable, so it provides an implementation of the compareTo() method that imposes the ordering you would expect: it sorts by the first string followed by the second.

    WritableComparable and comparators IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interfaces: package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { } Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another. One optimization that Hadoop provides is the RawComparator extension of Javas Comparator: package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } This interface permits implementors to compare records read from a stream without deserializing them into objects, thereby avoiding any overhead of object creation. For example, the comparator for IntWritables implements the raw compare() method by reading an integer from each of the byte arrays b1 and b2 and comparing them directly, from the given start positions (s1 and s2) and lengths (l1 and l2).

    Avro

    Apache Avro is a language-neutral data serialization system. Avro data is described using a language-independent schema. Avro schemas are usually written in JSON, and data is usually encoded using a binary format, but

    there are other options, too. There is a higher-level language called Avro IDL, for writing schemas in a C-like language that is more familiar to developers. There is also a JSON-based data encoder, which, being human-readable, is useful for prototyping and debugging Avro data.

    Avro specifies an object container format for sequences of objectssimilar to Hadoops sequence file. An Avro data file has a metadata section where the schema is stored, which

  • 53

    makes the file self-describing. Avro data files support compression and are splittable, which is crucial for a MapReduce data input format.

    Avro provides APIs for serialization and deserialization, which are useful when you want to integrate Avro with an existing system, such as a messaging system where the framing format is already defined. In other cases, consider using Avros data file format.

    Lets write a Java program to read and write Avro data to and from streams. Well start with a simple Avro schema for representing a pair of strings as a record:

    { "type": "record", "name": "StringPair", "doc": "A pair of strings.", "fields": [ {"name": "left", "type": "string"}, {"name": "right", "type": "string"} ] }

    If this schema is saved in a file on the classpath called StringPair.avsc (.avsc is the conventional extension for an Avro schema), then we can load it using the following two lines of code: Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(getClass().getResourceAsStream("StringPair.avsc"));

    We can create an instance of an Avro record using the generic API as follows: GenericRecord datum = new GenericData.Record(schema); datum.put("left", "L"); datum.put("right", "R");

    Next, we serialize the record to an output stream: ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter writer = new GenericDatumWriter(schema); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(datum, encoder); encoder.flush(); out.close();

    There are two important objects here: the DatumWriter and the Encoder. A DatumWriter translates data objects into the types understood by an Encoder, which the latter writes to the output stream. Here we are using a GenericDatumWriter, which passes the fields of GenericRecord to the Encoder. We pass a null to the encoder factory since we are not reusing a previously constructed encoder here.

    Avro data files

  • 54

    Avros object container file format is for storing sequences of Avro objects. It is very similar in design to Hadoops sequence files. A data file has a header containing metadata, including the Avro schema and a sync marker, followed by a series of (optionally compressed) blocks containing the serialized Avro objects. Writing Avro objects to a data file is similar to writing to a stream. We use a DatumWriter, as before, but instead of using an Encoder, we create a DataFileWriter instance with the DatumWriter. Then we can create a new data file (which, by convention, has a .avro extension) and append objects to it: File file = new File("data.avro"); DatumWriter writer = new GenericDatumWriter(schema); DataFileWriter dataFileWriter = new DataFileWriter(writer); dataFileWriter.create(schema, file); dataFileWriter.append(datum); dataFileWriter.close();

    The objects that we write to the data file must conform to the files schema, otherwise an exception will be thrown when we call append().

    Writing a SequenceFile

    SequenceFile provides Writer, Reader and SequenceFile.Sorter classes for writing, reading and sorting respectively.

    Hadoop has ways of splitting sequence files for doing jobs in parallel, even if they are compressed, making them a convenient way of storing your data without making your own format.

    Hadoop provides two file formats for grouping multiple entries in a single file:

    SequenceFile: A flat file which stores binary key/value pairs. The output of Map/Reduce tasks is usually written into a SequenceFile.

    MapFile: Consists of two SequenceFiles. The data file is identical to the SequenceFile and contains the data stored as binary key/value pairs. The second file is an index file, which contains a key/value map with seek positions inside the data file to quickly access the data.

    We started using the SequenceFile format to store log messages. It turned out that, while this format seems to be well suited for storing log messages and processing them with Map/Reduce jobs, the direct access to specific log messages is very slow. The API to read data from a

  • 55

    SequenceFile is iterator based, so that it is necessary to jump from entry to entry until the target entry is reached.

    Since one of our most important use cases is searching for log messages in real time, slow random access performance is a show stopper.

    MapFiles use 2 files: the index file stores seek positions for every n-th key in the datafile. The datafile stores to data as binary key/value-pairs.

    Therefore we moved to MapFiles. MapFiles have the disadvantage that a random access needs to read from 2 separate files. This seems to be slow, but the indexes which store the seek positions for our log entries are small enough to be cached in memory. Once the seek position is identified, only relevant portions of the data file are read. Overall this leads to a nice performance gain.

    To create a SequenceFile, use one of its createWriter() static methods, which returns a SequenceFile.Writer instance.

    Write Sequence File in Hadoop

    public class SequenceFileCreator { public static void main(String args[]) throws Exception { System.out.println("Sequence File Creator"); String uri = args[0]; String filePath = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); Path path = new Path(uri); SequenceFile.Writer writer = null; org.apache.hadoop.io.Text key = new org.apache.hadoop.io.Text(); BufferedReader buffer = new BufferedReader(new FileReader(filePath)); String line = null; org.apache.hadoop.io.Text value = new org.apache.hadoop.io.Text();

  • 56

    try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); while((line = buffer.readLine()) != null) { key.set(line); value.set(line); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }

    Read Sequence File in Hadoop

    public class SequenceFileReader { public static void main(String args[]) throws Exception { System.out.println("Sequence File Reader"); String uri = args[0]; // Input should be a sequence file Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs,path,conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while(reader.next(key,value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position , syncSeen , key , value); position = reader.getPosition(); } }catch(Exception e) { e.printStackTrace();

  • 57

    } finally { IOUtils.closeStream(reader); } } }

    Creating InputFormats and OutputF