cloudera administrator training
DESCRIPTION
hiTRANSCRIPT
01-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera Administrator Training for Apache Hadoop
201201$
01-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 1 Introduction
01-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Introduction
About This Course
About Cloudera
Course Logistics
01-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Course Objectives
During this course, you will learn:
The core technologies of Hadoop
How to plan your Hadoop cluster hardware and software
How to deploy a Hadoop cluster
How to schedule jobs on the cluster
How to maintain your cluster
How to monitor, troubleshoot, and optimize the cluster
What system administrator issues to consider when installing Hive, HBase and Pig
How to populate HDFS from external sources
01-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Course Contents
Chapter 1: Introduction
Chapter 2: An Introduction to Hadoop and HDFS
Chapter 3: Planning Your Hadoop Cluster
Chapter 4: Deploying Your Cluster
Chapter 5: Scheduling Jobs
Chapter 6: Cluster Maintenance
Chapter 7: Cluster Monitoring, Troubleshooting, and Optimizing
Chapter 8: Installing and Managing Hadoop Ecosystem Projects
Chapter 9: Populating HDFS From External Sources
Chapter 10: Conclusion
01-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera Certified Administrator for Apache Hadoop
At the end of the course, you will take the Cloudera Certified Administrator for Apache Hadoop exam
Passing the exam earns you the CCAH credential
Your instructor will tell you more about the exam during the week
01-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Introduction
About This Course
About Cloudera
Course Logistics
01-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
About Cloudera
Cloudera is “The commercial Hadoop company”
Founded by leading experts on Hadoop from Facebook, Google, Oracle and Yahoo
Staff includes several committers to Hadoop projects
01-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera Products
Cloudera’s Distribution of Hadoop (CDH) – A single, easy-to-install package from the Apache Hadoop core
repository – Includes a stable version of Hadoop, plus critical bug fixes and
solid new features from the development version – Open-source
– No ‘vendor lock-in’
Cloudera Manager – Easy, Wizard-based creation and management of Hadoop
clusters – Central monitoring and management point for the cluster – Free version supports up to 50 nodes
01-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera Enterprise
Cloudera Enterprise – Complete package of software and support – Built on top of CDH – Includes full version of Cloudera Manager
– Install, manage, and maintain a cluster of any size – LDAP integration – Includes powerful cluster monitoring and auditing tools
– Resource consumption tracking – Proactive health checks – Alerting – Configuration change audit trails – And more
– 24 x 7 support
01-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera Services
Provides consultancy services to many key users of Hadoop – Including Adconicon, AOL Advertising, Comscore, Groupon,
NAVTEQ, Samsung, Trend Micro, Trulia…
Solutions Architects and engineers are experts in Hadoop and related technologies – Several are committers to Apache Hadoop and related projects
Provides training in key areas of Hadoop administration and Development – Courses include Developer Training for Apache Hadoop,
Analyzing Data with Hive and Pig, HBase Training, Cloudera Essentials
– Custom course development available – Both public and on-site training available
01-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Introduction
About This Course
About Cloudera
Course Logistics
01-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Logistics
Course start and end times
Lunch
Breaks
Restrooms
Can I come in early/stay late?
Certification
01-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Introductions
About your instructor
About you – Experience with Hadoop? – Experience as a System Administrator? – What platform(s) do you use? – Expectations from the course?
02-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 2 An Introduction to Hadoop
02-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
In this chapter, you will learn:
What Hadoop is
Why Hadoop is important
What features the Hadoop Distributed File System (HDFS) provides
How MapReduce works
What other Apache Hadoop ‘ecosystem’ projects exist, and what they do
02-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Some Numbers…
Max data in memory (RAM): 64GB
Max data per computer (disk): 24TB
Data processed by Google every month: 400PB… in 2007
Average job size: 180GB
Time 180GB of data would take to read sequentially off a single disk drive: approximately 45 minutes
02-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Data Access Speed is the Bottleneck
We can process data very quickly, but we can only read/write it very slowly
Solution: parallel reads – 1 HDD = 75MB/sec – 1,000 HDDs = 75GB/sec
– Far more acceptable
02-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sharing is Slow
Grid computing is not new – MPI, PVM, Condor, …
Grid focus is on distributing the workload – Uses a NetApp filer or other SAN-based solution for many
compute nodes
Fine for relatively limited amounts of data – Reading large amounts of data from a single SAN device can
leave nodes ‘starved’
02-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sharing is Tricky
Exchanging data requires synchronization – Deadlocks become a problem
Finite bandwidth is available – Distributed systems can “drown themselves” – Failovers can cause cascading failure of the system
Temporal dependencies are complicated – Difficult to make decisions regarding partial restarts
02-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Reliability
“Failure is the defining difference between distributed and local programming”
– !Ken!Arnold,!CORBA!designer!
02-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Moving to a Cluster of Machines
In the late 1990s, Google decided to design its architecture using clusters of low-cost machines – Rather than fewer, more powerful machines
Creating an architecture around low-cost, unreliable hardware presents a number of challenges
02-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
System Requirements
System should support partial failure – Failure of one part of the system should result in a graceful
decline in performance – Not a full halt
System should support data recoverability – If components fail, their workload should be picked up by still-
functioning units
System should support individual recoverability – Nodes that fail and restart should be able to rejoin the group
activity without a full group restart
02-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
System Requirements (cont’d)
System should be consistent – Concurrent operations or partial internal failures should not cause
the results of the job to change
System should be scalable – Adding increased load to a system should not cause outright
failure – Instead, should result in a graceful decline
– Increasing resources should support a proportional increase in load capacity
02-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop’s Origins
Google created an architecture which answers these (and other) requirements
Released two White Papers – 2003: Description of the Google File System (GFS)
– A method for storing data in a distributed, reliable fashion – 2004: Description of distributed MapReduce
– A method for processing data in a parallel fashion
Hadoop was based on these White Papers
All of Hadoop is written in Java – Developers typically write their MapReduce code in Java – Higher-level abstractions on top of MapReduce have also been
developed
02-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop: A Radical Way Out
The Google architecture, and hence Hadoop, provides a radical approach to these issues: – Nodes talk to each other as little as possible
– Probably never! – This is known as a “shared nothing” architecture
– Programmer should not explicitly write code which communicates between nodes
Data is spread throughout machines in the cluster – Data distribution happens when data is loaded on to the cluster – Computation happens where the data is stored
Instead of bringing data to the processors, Hadoop brings the processing to the data
02-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS: Hadoop Distributed File System
Based on Google’s GFS (Google File System)
Provides redundant storage of massive amounts of data – Using cheap, unreliable computers
At load time, data is distributed across all nodes – Provides for efficient MapReduce processing (see later)
02-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS Assumptions
High component failure rates – Inexpensive components fail all the time
“Modest” number of HUGE files – Just a few million – Each file likely to be 100MB or larger
– Multi-Gigabyte files typical
Files are write-once – Append support is available in CDH3 for HBase reliability support
– Should not be used by developers!
Large streaming reads – Not random access
High sustained throughput should be favored over low latency
02-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS Features
Operates ‘on top of’ an existing filesystem
Files are stored as ‘blocks’ – Much larger than for most filesystems
– Default is 64MB
Provides reliability through replication – Each block is replicated across multiple DataNodes – Default replication factor is 3
Single NameNode daemon stores metadata and co-ordinates access – Provides simple, centralized management
Blocks are stored on slave nodes – Running the DataNode daemon
02-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS: Block Diagram
METADATA:/user/diana/foo -> 1, 2, 4/user/diana/bar -> 3, 5
1 1
2
2 35 5
NameNode:Stores metadata only
DataNodes: Store blocks from files
1 2 3
3
4 4
45
02-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The NameNode
The NameNode stores all metadata – Information about file locations in HDFS – Information about file ownership and permissions – Names of the individual blocks – Locations of the blocks
Metadata is stored on disk and read when the NameNode daemon starts up – Filename is fsimage – Note: block locations are not stored in fsimage
When changes to the metadata are required, these are made in RAM – Changes are also written to a log file on disk called edits – Full details later
02-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The NameNode: Memory Allocation
When the NameNode is running, all metadata is held in RAM for fast response
Each ‘item’ consumes 150-200 bytes of RAM
Items: – Filename, permissions, etc. – Block information for each block
02-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The NameNode: Memory Allocation (cont’d)
Why HDFS prefers fewer, larger files: – Consider 1GB of data, HDFS block size 128MB – Stored as 1 x 1GB file
– Name: 1 item – Blocks: 8 x 3 = 24 items – Total items: 25
– Stored as 1000 x 1MB files – Names: 1000 items – Blocks: 1000 x 3 = 3000 items – Total items: 4000
02-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The Slave Nodes
Actual contents of the files are stored as blocks on the slave nodes
Blocks are simply files on the slave nodes’ underlying filesystem – Named blk_xxxxxxx – Nothing on the slave node provides information about what
underlying file the block is a part of – That information is only stored in the NameNode’s metadata
Each block is stored on multiple different nodes for redundancy – Default is three replicas
Each slave node runs a DataNode daemon – Controls access to the blocks – Communicates with the NameNode
02-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Anatomy of a File Write
02-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Anatomy of a File Write (cont’d)
1. Client connects to the NameNode
2. NameNode places an entry for the file in its metadata, returns the block name and list of DataNodes to the client
3. Client connects to the first DataNode and starts sending data
4. As data is received by the first DataNode, it connects to the second and starts sending data
5. Second DataNode similarly connects to the third
6. ack packets from the pipeline are sent back to the client
7. Client reports to the NameNode when the block is written
02-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Anatomy of a File Write (cont’d)
If a DataNode in the pipeline fails – The pipeline is closed – The data continues to be written to the two good nodes in the
pipeline – The NameNode will realize that the block is under-replicated, and
will re-replicate it to another DataNode
As the blocks are written, a checksum is also calculated and written – Used to ensure the integrity of the data when it is later read
02-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop is ‘Rack-aware’
Hadoop understands the concept of ‘rack awareness’ – The idea of where nodes are located, relative to one another – Helps the JobTracker to assign tasks to nodes closest to the data – Helps the NameNode determine the ‘closest’ block to a client
during reads – In reality, this should perhaps be described as being ‘switch-
aware’
HDFS replicates data blocks on nodes on different racks – Provides extra data security in case of catastrophic hardware
failure
Rack-awareness is determined by a user-defined script – See later
02-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS Block Replication Strategy
First copy of the block is placed on the same node as the client – If the client is not part of the cluster, the first block is placed on a
random node – System tries to find one which is not too busy
Second copy of the block is placed on a node residing on a different rack
Third copy of the block is placed on different node in the same rack as the second copy
02-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Anatomy of a File Read
02-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Anatomy of a File Read (cont’d)
1. Client connects to the NameNode
2. NameNode returns the name and locations of the first few blocks of the file – Block locations are returned closest-first
3. Client connects to the first of the DataNodes, and reads the block
If the DataNode fails during the read, the client will seamlessly connect to the next one in the list to read the block
02-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Dealing With Data Corruption
As the DataNode is reading the block, it also calculates the checksum
‘Live’ checksum is compared to the checksum created when the block was stored
If they differ, the client reads from the next DataNode in the list – The NameNode is informed that a corrupted version of the block
has been found – The NameNode will then re-replicate that block elsewhere
The DataNode verifies the checksums for blocks on a regular basis to avoid ‘bit rot’ – Default is every three weeks after the block was created
02-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Data Reliability and Recovery
DataNodes send heartbeats to the NameNode – Every three seconds
After a period without any heartbeats, a DataNode is assumed to be lost – NameNode determines which blocks were on the lost node – NameNode finds other DataNodes with copies of these blocks
– These DataNodes are instructed to copy the blocks to other nodes
– Three-fold replication is actively maintained
02-32 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The NameNode Is Not A Bottleneck
Note: the data never travels via the NameNode – For writes – For reads – During re-replication
02-33 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS File Permissions
Files in HDFS have an owner, a group, and permissions – Very similar to Unix file permissions
File permissions are read (r), write (w) and execute (x) for each of owner, group, and other – x is ignored for files – For directories, x means that its children can be accessed
HDFS permissions are designed to stop good people doing foolish things – Not to stop bad people doing bad things! – HDFS believes you are who you tell it you are
02-34 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Stronger Security in Hadoop
Hadoop has always had authorization – The ability to allow people to do some things but not others – Example: file permissions
CDH supports Kerberos-based authentication – Making people prove they are who they say they are
Disabled by default
Complex to configure and administer – Requires a good knowledge of Kerberos
In practice, few people use this – Most rely on firewalls and other systems to restrict access to
clusters
02-35 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The Secondary NameNode: Caution!
The Secondary NameNode is not a failover NameNode! – It performs memory-intensive administrative functions for the
NameNode – NameNode keeps information about files and blocks (the
metadata) in memory – NameNode writes metadata changes to an editlog – Secondary NameNode periodically combines a prior
filesystem snapshot and editlog into a new snapshot – New snapshot is transmitted back to the NameNode – More on the detail of this process later in the course
Secondary NameNode should run on a separate machine in a large installation – It requires as much RAM as the NameNode
02-36 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-37 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Is MapReduce?
MapReduce is a method for distributing a task across multiple nodes
Each node processes data stored on that node – Where possible
Consists of two developer-created phases – Map – Reduce
In between Map and Reduce is the shuffle and sort – Sends data from the Mappers to the Reducers
02-38 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: The Big Picture
02-39 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Is MapReduce? (cont’d)
Process can be considered as being similar to a Unix pipeline
cat /my/log | grep '\.html' | sort | uniq –c > /my/outfile
Map Reduce Shuffle and sort
02-40 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Is MapReduce? (cont’d)
Key concepts to keep in mind with MapReduce: – The Mapper works on an individual record at a time – The Reducer aggregates results from the Mappers – The intermediate keys produced by the Mapper are the keys on
which the aggregation will be based
02-41 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Features of MapReduce
Automatic parallelization and distribution
Fault-tolerance
Status and monitoring tools
A clean abstraction for programmers – MapReduce programs are usually written in Java
– Can be written in any scripting language using Hadoop Streaming
– All of Hadoop is written in Java
MapReduce abstracts all the ‘housekeeping’ away from the developer – Developer can concentrate simply on writing the Map and
Reduce functions
02-42 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: Basic Concepts
Each Mapper processes a single input split from HDFS – Often a single HDFS block
Hadoop passes the developer’s Map code one record at a time
Each record has a key and a value
Intermediate data is written by the Mapper to local disk
During the shuffle and sort phase, all the values associated with the same intermediate key are transferred to the same Reducer – The developer specifies the number of Reducers
Reducer is passed each key and a list of all its values – Keys are passed in sorted order
Output from the Reducers is written to HDFS
02-43 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: A Simple Example
WordCount is the ‘Hello, World!’ of Hadoop
Map // assume input is a set of text files
// k is a byte offset
// v is the line for that offset
let map(k, v) =
foreach word in v:
emit(word, 1)
02-44 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: A Simple Example (cont’d)
Sample input to the Mapper:
Intermediate data produced:
1202 the cat sat on the mat
1225 the aardvark sat on the sofa
(the, 1), (cat, 1), (sat, 1), (on, 1), (the, 1), (mat, 1), (the, 1), (aardvark, 1), (sat, 1), (on, 1), (the, 1), (sofa, 1)
02-45 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: A Simple Example (cont’d)
Input to the Reducer:
(aardvark, [1])
(cat, [1])
(mat, [1])
(on, [1, 1])
(sat, [1, 1])
(sofa, [1])
(the, [1, 1, 1, 1])
02-46 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: A Simple Example (cont’d)
Reduce // k is a word, vals is a list of 1s
let reduce(k, vals) =
sum = 0
foreach (v in vals):
sum = sum + v
emit (k, sum)
02-47 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: A Simple Example (cont’d)
Output from the Reducer, written to HDFS:
(aardvark, 1)
(cat, 1)
(mat, 1)
(on, 2)
(sat, 2)
(sofa, 1)
(the, 4)
02-48 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Some MapReduce Terminology
A user runs a client program on a client computer
The client program submits a job to Hadoop – The job consists of a mapper, a reducer, and a list of inputs
The job is sent to the JobTracker
Each Slave Node runs a process called the TaskTracker
The JobTracker instructs TaskTrackers to run and monitor tasks – A Map or Reduce over a piece of data is a single task
A task attempt is an instance of a task running on a slave node – Task attempts can fail, in which case they will be restarted (more
later) – There will be at least as many task attempts as there are tasks
which need to be performed
02-49 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Aside: The Job Submission Process
When a job is submitted, the following happens: – The job configuration information is turned into an XML file – The client places the XML file and the job Jar in a temporary
directory in HDFS – The client calculates the input splits for the job
– How the input data will be split up between Mappers – The client contacts the JobTracker with information on the
location of the XML and Jar files, and the list of input splits
02-50 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce: High Level
02-51 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce Failure Recovery
Task processes send heartbeats to the TaskTracker
TaskTrackers send heartbeats to the JobTracker
Any task that fails to report in 10 minutes is assumed to have failed – Its JVM is killed by the TaskTracker
Any task that throws an exception is said to have failed
Failed tasks are reported to the JobTracker by the TaskTracker
The JobTracker reschedules any failed tasks – It tries to avoid rescheduling the task on the same TaskTracker
where it previously failed
If a task fails four times, the whole job fails
02-52 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
MapReduce Failure Recovery (Cont’d)
Any TaskTracker that fails to report in 10 minutes is assumed to have crashed – All tasks on the node are restarted elsewhere – Any TaskTracker reporting a high number of failed tasks is
blacklisted, to prevent the node from blocking the entire job – There is also a ‘global blacklist’, for TaskTrackers which fail on
multiple jobs
The JobTracker manages the state of each job – Partial results of failed tasks are ignored
02-53 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-54 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The Apache Hadoop Project
Hadoop is a ‘top-level’ Apache project – Created and managed under the auspices of the Apache
Software Foundation
Several other projects exist that rely on some or all of Hadoop – Typically either both HDFS and MapReduce, or just HDFS
Ecosystem projects are often also top-level Apache projects – Some are ‘Apache incubator’ projects – Some are not managed by the Apache Software Foundation
Ecosystem projects include Hive, Pig, Sqoop, Flume, HBase, Oozie, …
02-55 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hive
Hive is a high-level abstraction on top of MapReduce – Initially created by a team at Facebook – Avoids having to write Java MapReduce code – Data in HDFS is queried using a language very similar to SQL
– Known as HiveQL
HiveQL queries are turned into MapReduce jobs by the Hive interpreter – ‘Tables’ are just directories of files stored in HDFS
– A Hive Metastore contains information on how to map a file to a table structure
02-56 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hive (cont’d)
Example Hive query:
We will discuss how to install Hive later in the course
SELECT stock.product, SUM(orders.purchases)
FROM stock INNER JOIN orders
ON (stock.id = orders.stock_id)
WHERE orders.quarter = 'Q1'
GROUP BY stock.product;
02-57 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Pig
Pig is another high-level abstraction on top of MapReduce – Originally created at Yahoo! – Uses a dataflow scripting language known as PigLatin
PigLatin scripts are converted to MapReduce jobs by the Pig interpreter
02-58 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Pig (cont’d)
Sample PigLatin script:
We will discuss how to install Pig later in the course
stock = LOAD '/user/fred/stock' AS (id, item);
orders= LOAD '/user/fred/orders' AS (id, cost);
grpd = GROUP orders BY id;
totals = FOREACH grpd GENERATE group, SUM(orders.cost) AS t;
result = JOIN stock BY id, totals BY group;
DUMP result;
02-59 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase
HBase is described as “The Hadoop database” – A column-oriented data store – Provides random, real-time read/write access to large amounts of
data – Allows you to manage tables consisting of billions of rows, with
potentially millions of columns
HBase stores its data in HDFS for reliability and availability
We will discuss issues related to HBase installation and maintenance later in the course
02-60 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-61 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Installing Hadoop
Please refer to the Exercise Manual
02-62 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Introduction to Hadoop
Why Hadoop?
What is HDFS?
What is MapReduce?
Hive, Pig, HBase and other Ecosystem projects
Hands-On Exercise: Installing Hadoop
Conclusion
02-63 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
What Hadoop is
Why Hadoop is important
What features the Hadoop Distributed File System (HDFS) provides
How MapReduce works
What other Apache Hadoop ‘ecosystem’ projects exist, and what they do
03-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 3 Planning Your
Hadoop Cluster
03-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
In this chapter, you will learn:
What issues to consider when planning your Hadoop cluster
What types of hardware are typically used for Hadoop nodes
How to optimally configure your network topology
How to select the right operating system and Hadoop distribution
03-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
General Planning Considerations
Choosing The Right Hardware
Network Considerations
Configuring Nodes
Conclusion
03-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Thinking About the Problem
Hadoop can run on a single machine – Great for testing, developing – Obviously not practical for large amounts of data
Many people start with a small cluster and grow it as required – Perhaps initially just four or six nodes – As the volume of data grows, more nodes can easily be added
Ways of deciding when the cluster needs to grow – Increasing amount of computation power needed – Increasing amount of data which needs to be stored – Increasing amount of memory needed to process tasks
03-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Growth Based on Storage Capacity
Basing your cluster growth on storage capacity is often a good method to use
Example: – Data grows by approximately 1TB per week – HDFS set up to replicate each block three times – Therefore, 3TB of extra storage space required per week
– Plus some overhead – say, 30% – Assuming machines with 4 x 1TB hard drives, this equates to a
new machine required each week – Alternatively: Two years of data – 100TB – will require
approximately 100 machines
03-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
General Planning Considerations
Choosing The Right Hardware
Network Considerations
Configuring Nodes
Conclusion
03-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Classifying Nodes
Nodes can be classified as either ‘slave nodes’ or ‘master nodes’
Slave node runs DataNode plus TaskTracker daemons
Master node runs either a NameNode daemon, a Secondary NameNode Daemon, or a JobTracker daemon – On smaller clusters, NameNode and JobTracker are often run on
the same machine – Sometimes even Secondary NameNode is on the same machine
as the NameNode and JobTracker – Important that at least one copy of the NameNode’s metadata
is stored on a separate machine (see later)
03-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: Recommended Configuration
Typical ‘base’ configuration for a slave Node – 4 x 1TB or 2TB hard drives, in a JBOD* configuration
– Do not use RAID! (See later) – 2 x Quad-core CPUs – 24-32GB RAM – Gigabit Ethernet
Multiples of (1 hard drive + 2 cores + 6-8GB RAM) tend to work well for many types of applications – Especially those that are I/O bound
* JBOD: Just a Bunch Of Disks
03-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: More Details
In general, when considering higher-performance vs lower-performance components:
“Save the money, buy more nodes!”
Typically, a cluster with more nodes will perform better than one with fewer, slightly faster nodes
03-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: More Details (CPU)
Quad-core CPUs are now standard
Hex-core CPUs are becoming more prevalent – But are more expensive
Hyper-threading should be enabled
Hadoop nodes are seldom CPU-bound – They are typically disk- and network-I/O bound – Therefore, top-of-the-range CPUs are usually not necessary
03-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: More Details (RAM)
Slave node configuration specifies the maximum number of Map and Reduce tasks that can run simultaneously on that node
Each Map or Reduce task will take 1GB to 2GB of RAM
Slave nodes should not be using virtual memory
Ensure you have enough RAM to run all tasks, plus overhead for the DataNode and TaskTracker daemons, plus the operating system
Rule of thumb: Total number of tasks = 1.5 x number of processor cores – This is a starting point, and should not be taken as a definitive
setting for all clusters
03-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: More Details (Disk)
In general, more spindles (disks) is better
In practice, we see anywhere from four to 12 disks per node
Use 3.5" disks – Faster, cheaper, higher capacity than 2.5" disks
7,200 RPM SATA drives are fine – No need to buy 15,000 RPM drives
8 x 1.5TB drives is likely to be better than 6 x 2TB drives – Different tasks are more likely to be accessing different disks
A good practical maximum is 24TB per slave node – More than that will result in massive network traffic if a node dies
and block re-replication must take place
03-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Slave Nodes: Why Not RAID?
Slave Nodes do not benefit from using RAID* storage – HDFS provides built-in redundancy by replicating blocks across
multiple nodes – RAID striping (RAID 0) is actually slower than the JBOD
configuration used by HDFS – RAID 0 read and write operations are limited by the speed of
the slowest disk in the RAID array – Disk operations on JBOD are independent, so the average
speed is greater than that of the slowest disk – One test by Yahoo showed JBOD performing between 10%
and 30% faster than RAID 0, depending on the operations being performed
* RAID: Redundant Array of Inexpensive Disks
03-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What About Virtualization?
Virtualization is usually not worth considering – Multiple virtual nodes per machine hurts performance – Hadoop runs optimally when it can use all the disks at once
03-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What About Blade Servers?
Blade servers are not recommended – Failure of a blade chassis results in many nodes being
unavailable – Individual blades usually have very limited hard disk capacity – Network interconnection between the chassis and top-of-rack
switch can become a bottleneck
03-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Master Nodes: Single Points of Failure
Slave nodes are expected to fail at some point – This is an assumption built into Hadoop – NameNode will automatically re-replicate blocks that were on the
failed node to other nodes in the cluster, retaining the 3x replication requirement
– JobTracker will automatically re-assign tasks that were running on failed nodes
Master nodes are single points of failure – If the NameNode goes down, the cluster is inaccessible – If the JobTracker goes down, no jobs can run on the cluster
– All currently running jobs will fail
Spend more money on your master nodes!
03-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Master Node Hardware Recommendations
Carrier-class hardware – Not commodity hardware
Dual power supplies
Dual Ethernet cards – Bonded to provide failover
RAIDed hard drives
At least 32GB of RAM
03-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
General Planning Considerations
Choosing The Right Hardware
Network Considerations
Configuring Nodes
Conclusion
03-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
General Network Considerations
Hadoop is very bandwidth-intensive! – Often, all nodes are communicating with each other at the same
time
Use dedicated switches for your Hadoop cluster
Nodes are connected to a top-of-rack switch
Nodes should be connected at a minimum speed of 1Gb/sec
For clusters where large amounts of intermediate data is generated, consider 10Gb/sec connections – Expensive – Alternative: bond two 1Gb/sec connections to each node
03-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
General Network Considerations (cont’d)
Racks are interconnected via core switches
Core switches should connect to top-of-rack switches at 10Gb/sec or faster
Beware of oversubscription in top-of-rack and core switches
Consider bonded Ethernet to mitigate against failure
Consider redundant top-of-rack and core switches
03-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
General Planning Considerations
Choosing The Right Hardware
Network Considerations
Configuring Nodes
Conclusion
03-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Operating System Recommendations
Choose an OS you’re comfortable administering
CentOS: geared towards servers rather than individual workstations – Conservative about package versions – Very widely used in production
RedHat Enterprise Linux (RHEL): RedHat-supported analog to CentOS – Includes support contracts, for a price
In production, we often see a mixture of RHEL and CentOS machines – Often RHEL on master nodes, CentOS on slaves
03-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Operating System Recommendations (cont’d)
Fedora Core: geared towards individual workstations – Includes newer versions of software, at the expense of some
stability – We recommend server-based, rather than workstation-based,
Linux distributions
Ubuntu: Very popular distribution, based on Debian – Both desktop and server versions available – Try to use an LTS (Long Term Support) version
SuSE: popular distribution, especially in Europe – Cloudera provides CDH packages for SuSE
Solaris, OpenSolaris: not commonly seen in production clusters
03-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuring The System
Do not use Linux’s LVM (Logical Volume Manager) to make all your disks appear as a single volume – As with RAID 0, this limits speed to that of the slowest disk
Check the machines’ BIOS* settings – BIOS settings may not be configured for optimal performance – For example, if you have SATA drives make sure IDE emulation is
not enabled
Test disk I/O speed with hdparm -t – Example:
hdparm -t /dev/sda1 – You should see speeds of 70MB/sec or more
– Anything less is an indication of possible problems
* BIOS: Basic Input/Output System
03-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuring The System (cont’d)
Hadoop has no specific disk partitioning requirements – Use whatever partitioning system makes sense to you
Mount disks with the noatime option
Common directory structure for data mount points: /data/<n>/dfs/nn /data/<n>/dfs/dn /data/<n>/dfs/snn /data/<n>/mapred/local
Reduce the swappiness of the system – Set vm.swappiness to 0 or 5 in /etc/sysctl.conf
03-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Filesystem Considerations
Cloudera recommends the ext3 and ext4 filesystems – ext4 is now becoming more commonly used
XFS provides some performance benefit during kickstart – It formats in 0 seconds, vs several minutes for each disk with ext3
XFS has some performance issues – Slow deletes in some versions
– Some performance improvements are available; see e.g., http://everything2.com/index.pl?node_id=1479435
– Some versions had problems when a machine runs out of memory
03-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Operating System Parameters
Increase the nofile ulimit for the mapred and hdfs users to at least 32K – Setting is in /etc/security/limits.conf
Disable IPv6
Disable SELinux
Install and configure the ntp daemon – Ensures the time on all nodes is synchronized
– Important for HBase – Useful when using logs to debug problems
03-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Java Virtual Machine (JVM) Recommendations
Always use the official Oracle JDK (http://java.com/) – Hadoop is complex software, and often exposes bugs in other
JDK implementations
Version 1.6 is required – Avoid 1.6.0u18
– This version had significant bugs
Hadoop is not yet production-tested with Java 7 (1.7)
Recommendation: don’t upgrade to a new version as soon as it is released – Wait until it has been tested for some time
03-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Which Version of Hadoop?
‘Standard’ Apache version of Hadoop is available at http://hadoop.apache.org/
Cloudera’s Distribution including Apache Hadoop (CDH) starts with the latest stable Hadoop distribution – Includes useful patches and bugfixes backported from future
releases – Includes improvements developed by Cloudera for our Support
customers – Includes additional tools for ease of installation, configuration and
use – Ensures interoperability between different Ecosystem projects – Provided in RPM, Ubuntu and SuSE package, and tarball formats – Available from http://www.cloudera.com/
03-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Planning Your Hadoop Cluster
General Planning Considerations
Choosing The Right Hardware
Network Considerations
Configuring Nodes
Conclusion
03-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
What issues to consider when planning your Hadoop cluster
What types of hardware are typically used for Hadoop nodes
How to optimally configure your network topology
How to select the right operating system and Hadoop distribution
04-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 4 Configuring and
Deploying Your Cluster
04-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
In this chapter, you will learn:
The different installation configurations available in Hadoop
How to install Hadoop
How SCM can make installation and configuration easier
How to launch the Hadoop daemons
How to configure Hadoop
How to specify your rack topology script
04-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop’s Different Deployment Modes
Hadoop can be configured to run in three different modes – LocalJobRunner – Pseudo-distributed – Fully distributed
04-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
LocalJobRunner Mode
In LocalJobRunner mode, no daemons run
Everything runs in a single Java Virtual Machine (JVM)
Hadoop uses the machine’s standard filesystem for data storage – Not HDFS
Suitable for testing MapReduce programs during development
04-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Pseudo-Distributed Mode
In pseudo-distributed mode, all daemons run on the local machine – Each runs in its own JVM (Java Virtual Machine)
Hadoop uses HDFS to store data (by default)
Useful to simulate a cluster on a single machine
Convenient for debugging programs before launching them on the ‘real’ cluster
04-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Fully-Distributed Mode
In fully-distributed mode, Hadoop daemons run on a cluster of machines
HDFS used to distribute data amongst the nodes
Unless you are running a small cluster (less than 10 or 20 nodes), the NameNode and JobTracker should each be running on dedicated nodes – For small clusters, it’s acceptable for both to run on the same
physical node
04-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying on Multiple Machines
If you are installing multiple machines, use some kind of automated deployment – Red Hat’s Kickstart – Debian Fully Automatic Installation – Solaris JumpStart – Dell Crowbar – …
04-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
RPM/Package vs Tarballs
Cloudera’s Distribution including Apache Hadoop (CDH) is available in multiple formats – RPMs for Red Hat-style Linux distributions (RHEL, CentOS) – Packages for Ubuntu and SuSE Linux – As a tarball
RPMs/Packages include some features not in the tarball – Automatic creation of mapred and hdfs users – init scripts to automatically start the Hadoop daemons
– Although these are not activated by default – Configures the ‘alternatives’ system to allow multiple
configurations on the same machine
Strong recommendation: use the RPMs/packages whenever possible
04-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installation From RPM or Package
Install Hadoop – Add the Cloudera repository
– Full installation details at http://archive.cloudera.com/docs/
– yum install hadoop-0.20 (RPM-based systems) – apt-get –y install hadoop-0.20 (Debian-based systems)
Install the init scripts for the daemons which should run on each machine – Example:
sudo yum install hadoop-0.20-datanode sudo yum install hadoop-0.20-tasktracker
04-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installation From the Tarball
Install Java 6
Create mapred and hdfs system users and groups
Download and unpack the Hadoop tarball – Place this somewhere sensible, such as /usr/local
Edit the configuration files
Create the relevant directories
Format the HDFS filesystem
04-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Starting the Hadoop Daemons
CDH installed from package or RPM includes init scripts to start the daemons
If you have installed Hadoop manually, or from the CDH tarball, you will have to start the daemons manually – Not all daemons run on each machine
– DataNode, TaskTracker – On each data node in the cluster
– NameNode, JobTracker – One per cluster
– Secondary NameNode – One per cluster
04-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Avoid Using start-all.sh, stop-all.sh
Hadoop includes scripts called start-all.sh and stop-all.sh – These connect to, and start, all the DataNode and TaskTracker
daemons
Cloudera recommends not using these scripts – They require all DataNodes to allow passwordless SSH login,
which most environments will not allow
04-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Aside: SSH
Note that most tutorials tell you to create a passwordless SSH login on each machine
This is not necessary for the operation of Hadoop – Hadoop does not use SSH in any of its internal communications
ssh is only required if you intend to use the start-all.sh and stop-all.sh scripts
04-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Verify the Installation
To verify that everything has started correctly, check by running an example job: – Copy files into Hadoop for input
hadoop fs -put /etc/hadoop-0.20/conf/*.xml input
– Run an example job hadoop jar /usr/lib/hadoop-0.20/hadoop-*-examples.jar \
grep input output 'dfs[a-z.]+'
– View the output – hadoop fs -cat output/part-00000 | head
04-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cloudera’s SCM For Easy Cluster Installation
Cloudera has released Service and Configuration Manager (SCM), a tool for easy deployment and configuration of Hadoop clusters
The free version, SCM Express, can manage up to 50 nodes – The version supplied with Cloudera Enterprise supports an
unlimited number of nodes
04-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing SCM Express
1. Download SCM Express to a management machine
2. Make the binary executable with chmod, and run it
3. Follow the on-screen instructions
This process installs the SCM server
Once installed, you can access the server via its Web interface – http://scm_manager_host:7180/
04-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express
The first time you connect to the SCM Express server via the Web interface, a Wizard guides you through initial setup of your cluster – You are asked for the names or IP addresses of the machines in
the cluster – SCM then connects to each machine and installs CDH, plus the
SCM agent which controls the Hadoop daemons
04-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express (cont’d)
04-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express (cont’d)
Once CDH is installed, the Wizard allows you to set up any of HDFS, MapReduce, and HBase on the cluster
By default, it will choose the most appropriate machine(s) to act as the master nodes – Based on the hardware specifications of the nodes and the
number of nodes in the cluster – To specify machines manually, once the Wizard has completed
you may remove the services and re-create them manually from the main configuration screen
04-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express: Main Configuration Screen
04-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express: Changing Configurations
SCM Express provides a central point from which to manage machine configurations
Configurations are changed via the Web interface – They are then pushed out to the relevant machines on the cluster – You can restart daemons centrally, from the Web interface
04-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using SCM Express: Changing Configurations (cont’d)
04-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop’s Configuration Files
Each machine in the Hadoop cluster has its own set of configuration files
Configuration files all reside in Hadoop’s conf directory – Typically /etc/hadoop/conf
Primary configuration files are written in XML
04-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop’s Configuration Files (cont’d)
Earlier versions of Hadoop stored all configuration in hadoop-site.xml
From 0.20 onwards, configurations have been separated out based on functionality – Core properties: core-site.xml – HDFS properties: hdfs-site.xml – MapReduce properties: mapred-site.xml
hadoop-env.sh sets some environment variables used by Hadoop – Such as location of log files and pid files
04-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample Configuration File
Sample configuration file (mapred-site.xml) <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl">
<configuration> <property> <name>mapred.job.tracker</name> <value>localhost:8021</value> </property> </configuration>
04-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuration Value Precedence
Configuration parameters can be specified more than once
Highest-precedence value takes priority
Precedence order (lowest to highest): – *-site.xml on the slave node – *-site.xml on the client machine – Values set explicitly in the JobConf object for a MapReduce job
If a value in a configuration file is marked as final it overrides all others – <property> – <name>some.property.name</name> – <value>somevalue</value> – <final>true</final> – </property>
04-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Recommended Parameter Values
There are many different parameters which can be set
Defaults are documented at – http://archive.cloudera.com/cdh/3/hadoop/core-default.html – http://archive.cloudera.com/cdh/3/hadoop/hdfs-default.html – http://archive.cloudera.com/cdh/3/hadoop/mapred-default.html
Hadoop is still a young system – ‘Best practices’ and optional values change as more and more
organizations deploy Hadoop in production
Here we present some of the key parameters, and suggest recommended values – Based on our experiences working with clusters ranging from a
few nodes up to 1,000+
04-32 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
hdfs-site.xml
The single most important configuration value on your entire cluster, set on the NameNode:
Loss of the NameNode’s metadata will result in the effective loss of all the data on the cluster – Although the blocks will remain, there is no way of reconstructing
the original files without the metadata
This must be at least two disks (or a RAID volume) on the NameNode, plus an NFS mount elsewhere on the network – Failure to set this correctly will result in eventual loss of your
cluster’s data
dfs.name.dir Where on the local filesystem the NameNode stores its metadata. A comma-separated list. Default is ${hadoop.tmp.dir}/dfs/name.
04-33 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
hdfs-site.xml (cont’d)
The NameNode will write to the edit log in all directories in dfs.name.dir synchronously
If a directory in the list disappears, the NameNode will continue to function – It will ignore that directory until it is restarted
Recommendation for the NFS mount point tcp,soft,intr,timeo=10,retrans=10 – Soft mount so the NameNode will not hang if the mount point
disappears – Will retry transactions 10 times, at 1-10 second intervals, before
being deemed to have failed
Note: no space between the comma and next directory name in the list!
04-34 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
hdfs-site.xml (cont’d)
dfs.block.size The block size for new files, in bytes. Default is 67108864 (64MB). Recommended: 134217728 (128MB). Specified on each node, including clients.
dfs.data.dir Where on the local filesystem a DataNode stores its blocks. Can be a comma-separated list of directories (no spaces between the comma and the path); round-robin writes to the directories (no redundancy). Specified on each DataNode; can be different on different DataNodes.
04-35 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
hdfs-site.xml (cont’d)
dfs.namenode.handler.count The number of threads the NameNode uses to handle RPC requests from DataNodes. Default: 10. Recommended: 10% of the number of nodes, with a floor of 10 and a ceiling of 200. Symptoms of this being set too low: ‘connection refused’ messages in DataNode logs as they try to transmit block reports to the NameNode. Specified on the NameNode.
dfs.permissions If true (the default), checks file permissions. If false, permission checking is disabled (everyone can access every file). Specified on the NameNode.
04-36 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
hdfs-site.xml (cont’d)
dfs.datanode.du.reserved The amount of space on each volume which cannot be used for HDFS block storage. Recommended: at least 10GB (See later.) Specified on each DataNode.
dfs.datanode.failed.volumes.tolerated
The number of volumes allowed to fail before the DataNode takes itself offline, ultimately resulting in all of its blocks being re-replicated. Default: 0, but often increased on machines with several disks. Specified on each DataNode.
dfs.replication The number of times each block should be replicated when a file is written. Default: 3. Recommended: 3. Specified on each node, including clients.
04-37 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
core-site.xml
fs.default.name The name of the default filesystem. Usually the name and port for the NameNode. Example: hdfs://<your_namenode>:8020/ Specified on every machine which needs access to the cluster, including all nodes.
fs.checkpoint.dir Comma-separated list of directories in which the Secondary NameNode will store its checkpoint images. If more than one directory is specified, all are written to. Specified on the Secondary NameNode.
04-38 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
core-site.xml (cont’d)
fs.trash.interval When a file is deleted, it is placed in a .Trash directory in the user’s home directory, rather than being immediately deleted. It is purged from HDFS after the number of minutes specified. Default: 0 (disabled). Recommended: 1440 (one day). Specified on clients and on the NameNode.
04-39 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
core-site.xml (cont’d)
hadoop.tmp.dir Base temporary directory, both on the local disk and in HDFS. Default is /tmp/hadoop-${user.name}. Specified on all nodes.
io.file.buffer.size Determines how much data is buffered during read and write operations. Should be a power of 2 of hardware page size. Default: 4096. Recommendation: 65536 (64KB). Specified on all nodes.
io.compression.codecs List of compression codecs that Hadoop can use for file compression. Specified on all nodes. Default is org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec
04-40 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml
mapred.job.tracker Hostname and port of the JobTracker. Example: my_job_tracker:8021. Specified on all nodes and clients.
mapred.child.java.opts Java options passed to the TaskTracker child processes. Default is -Xmx200m (200MB of heap space). Recommendation: increase to 512MB or 1GB, depending on the requirements from your developers. Specified on each TaskTracker node.
mapred.child.ulimit Maximum virtual memory in KB allocated to any child process of the TaskTracker. If specified, set to at least 2x the valueof mapred.child.java.opts or the child JVM may not start.
04-41 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
mapred.local.dir The local directory where MapReduce stores its intermediate data files. May be a comma-separated list of directories on different devices. Recommendation: list directories on all disks, and set dfs.du.reserved (in hdfs-site.xml) such that approximately 25% of the total disk capacity cannot be used by HDFS. Example: for a node with 4 x 1TB disks, set dfs.du.reserved to 250GB. Specified on each TaskTracker node.
04-42 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
mapred.job.tracker.handler.count Number of threads used by the JobTracker to respond to heartbeats from the TaskTrackers. Default: 10. Recommendation: approx. 4% of the number of nodes with a floor of 10 and a ceiling of 200. Specified on the JobTracker.
mapred.reduce.parallel.copies Number of TaskTrackers a Reducer can connect to in parallel to transfer its data. Default: 5. Recommendation: SQRT(number_of_nodes) with a floor of 10. Specified on all TaskTracker nodes.
tasktracker.http.threads The number of HTTP threads in the TaskTracker which the Reducers use to retrieve data. Default: 40. Recommendation: 80. Specified on all TaskTracker nodes.
04-43 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
mapred.reduce.slowstart.completed.maps
The percentage of Map tasks which must be completed before the JobTracker will schedule Reducers on the cluster. Default: 0.05. Recommendation: 0.5 to 0.8. Specified on the JobTracker.
mapred.jobtracker.taskScheduler The class used by the JobTracker to determine how to schedule tasks on the cluster. Default: org.apache.hadoop.mapred.JobQueueTaskScheduler.!Recommendation: org.apache.hadoop.mapred.FairScheduler. (Job and task scheduling is discussed later in the course.) Specified on the JobTracker.
04-44 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
Rule of thumb: total number of Map + Reduce tasks on a node should be approximately 1.5 x the number of processor cores on that node – Assuming there is enough RAM on the node – This should be monitored
– If the node is not processor or I/O bound, increase the total number of tasks
– Typical distribution: 60% Map tasks, 40% Reduce tasks or 70% Map tasks, 30% Reduce tasks
mapred.tasktracker.map.tasks.maximum
Number of Map tasks which can be run simultaneously by the TaskTracker. Specified on each TaskTracker node.
mapred.tasktracker.reduce.tasks.maximum
Number of Reduce tasks which can be run simultaneously by the TaskTracker. Specified on each TaskTracker node.
04-45 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
If a task is running significantly more slowly than the average speed of tasks for that job, speculative execution may occur – Another attempt to run the same task is instantiated on a different
node – The results from the first completed task are used – The slower task is killed
mapred.map.tasks.speculative.execution
Whether to allow speculative execution for Map tasks. Default: true. Recommendation: true. Specified on the JobTracker.
mapred.reduce.tasks.speculative.execution
Whether to allow speculative execution for Reduce tasks. Default: true. Recommendation: false. Specified on the JobTracker.
04-46 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
mapred.compress.map.output Determines whether intermediate data from Mappers should be compressed before transfer across the network. Default: false. Recommendation: true. Specified on all TaskTracker nodes.
mapred.output.compression.type If the output from the Reducers are SequenceFiles, determines whether to compress the SequenceFiles. Default: RECORD. Options: NONE, RECORD, BLOCK. Recommendation: BLOCK. Specified on all TaskTracker nodes.
04-47 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
mapred-site.xml (cont’d)
More discussion of these parameters later in the course
io.sort.mb The size of the buffer on the Mapper to which the Mapper writes its Key/Value pairs. Default: 100MB. Recommendation: 256MB. This allocation comes out of the task’s JVM heap space. Specified on each TaskTracker node.
io.sort.factor The number of streams to merge at once when sorting files. Specified on each TaskTracker node.
04-48 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Additional Configuration Files
There are several more configuration files in /etc/hadoop/conf – hadoop-env.sh: environment variables for Hadoop daemons – HDFS and MapReduce include/exclude files
– Controls who can connect to the NameNode and JobTracker – masters, slaves: hostname lists for ssh control – hadoop-policy.xml: Access control policies – log4j.properties: logging (covered later in the course) – fair-scheduler.xml: Scheduler (covered later in the course) – hadoop-metrics.properties: Monitoring (covered later in
the course)
04-49 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Environment Setup: hadoop-env.sh
hadoop-env.sh sets environment variables necessary for Hadoop to run – HADOOP_CLASSPATH – HADOOP_HEAPSIZE – HADOOP_LOG_DIR – HADOOP_PID_DIR – JAVA_HOME
Values are sourced into all Hadoop control scripts and therefore the Hadoop daemons
If you need to set environment variables, do it here to ensure that they are passed through to the control scripts
04-50 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Environment Setup: hadoop-env.sh (cont’d)
HADOOP_HEAPSIZE – Controls the heap size for Hadoop daemons – Default 1GB – Comment this out, and set the heap for individual daemons
HADOOP_NAMENODE_OPTS – Java options for the NameNode – At least 4GB: -Xmx4g
HADOOP_JOBTRACKER_OPTS – Java options for the JobTracker – At least 4GB: -Xmx4g
HADOOP_DATANODE_OPTS, HADOOP_TASKTRACKER_OPTS – Set to 1GB each: -Xmx1g
04-51 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
masters and slaves
masters and slaves list the master and slave nodes in the cluster
Only used by start-all.sh, stop-all.sh scripts
Recommended that these scripts are not used – Therefore the masters and slaves files are not necessary
04-52 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Host ‘include’ and ‘exclude’ Files
Optionally, specify dfs.hosts in hdfs-site.xml to point to a file listing hosts which are allowed to connect to the NameNode and act as DataNodes – Similarly, mapred.hosts points to a file which lists hosts allowed
to connect as TaskTrackers
Both files are optional – If omitted, any host may connect and act as a DataNode/
TaskTracker – This is a possible security/data integrity issue
NameNode can be forced to reread the dfs.hosts file with hadoop dfsadmin -refreshNodes – No such command for the JobTracker, which has to be restarted
to re-read the mapred.hosts file, so many System Administrators only create a dfs.hosts file
04-53 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Host ‘include’ and ‘exclude’ Files (cont’d)
It is possible to explicitly prevent one or more hosts from acting as DataNodes – Create a dfs.hosts.exclude property, and specify a filename – List the names of all the hosts to exclude in that file – These hosts will then not be allowed to connect to the NameNode – This is often used if you intend to decommission nodes (see later) – Run hadoop dfsadmin -refreshNodes to make the
NameNode re-read the file
Similarly, mapred.hosts.exclude can be used to specify a file listing hosts which may not connect to the JobTracker – Not as commonly used, since the JobTracker must be restarted in
order to re-read the file
04-54 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-55 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Rack Topology Awareness
Recall that HDFS is ‘rack aware’ – Distributes blocks based on hosts’ locations – Administrator supplies a script which tells Hadoop which rack a
node is in – Should return a hierarchical ‘rack ID’ for each argument it’s
passed – Rack ID is of the form /datacenter/rack
– Example: /datactr1/rack40 – Script can use a flat file, database, etc etc – Script name is in topology.script.file.name in core-site.xml – If this is blank (default), Hadoop returns a value of /default-rack for all nodes
04-56 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample Rack Topology Script
A sample rack topology script:
#!/usr/bin/env python import sys DEFAULT_RACK = "/default-rack" HOST_RACK_FILE = "/etc/hadoop/conf/host-rack.map" host_rack = {} for line in open(HOST_RACK_FILE): (host, rack) = line.split() host_rack[host] = rack for host in sys.argv[1:]: if host in host_rack: print host_rack[host] else: print DEFAULT_RACK
04-57 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample Rack Topology Script (cont’d)
The /etc/hadoop/conf/host-rack.map file:
host1 /datacenter1/rack1 host2 /datacenter1/rack1 host3 /datacenter1/rack1 host4 /datacenter1/rack1 host5 /datacenter1/rack2 host6 /datacenter1/rack2 host7 /datacenter1/rack2 host8 /datacenter1/rack2 ...
04-58 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Naming Machines to Aid Rack Awareness
A common scenario is to name your hosts in such a way that the Rack Topology Script can easily determine their location – Example: a host called r1m32
– 32nd machine in Rack 1 – The Rack Topology Script can simply deconstruct the machine
name and then return the rack awareness information
04-59 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
A Note on DNS vs IP Addresses
You can use machine names or IP addresses to identify nodes in Hadoop’s configuration files
You should use one or the other, but not a combination! – Hadoop performs both forward and reverse lookups on IP
addresses in different situations; if the results don’t match, it could cause major problems
Most people use names rather than IP addresses – This means you must ensure DNS is configured correctly on your
cluster – Just using the /etc/hosts file on each node will cause
configuration headaches as the cluster grows
04-60 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Reading Configuration Changes
Cluster daemons generally need to be restarted to read in changes to their configuration files
DataNodes do not need to be restarted if only NameNode parameters were changed
If you need to restart everything: – Put HDFS in Safe Mode – Take the DataNodes down – Stop and start the NameNode – Start the DataNodes
04-61 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-62 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing Large Clusters
Each node in the cluster requires its own configuration files
Managing a small cluster is relatively easy – Log in to each machine to make changes – Manually change configuration files
As the cluster grows larger, management becomes more complex
Many administrators use ‘cluster shell’-type utilities to log in to multiple machines simultaneously – Potentially dangerous!
04-63 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Shell: Example (csshX)
04-64 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuration Management Tools
A much better solution: use configuration management software
Popular open source tools: Puppet, Chef – Many others exist – Many commercial tools also exist
These tools allow you to manage configuration of multiple machines at once – Can update files, restart daemons or even reboot machines
automatically where necessary
04-65 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuration Management Tools (cont’d)
Recommendation: start using such tools when the cluster is small!
Retrofitting configuration management software to an existing cluster can be difficult – Machines tend not to be set up identically – Configuration scripts end up containing many exceptions for
different machines
Alternative: Use Cloudera’s Service and Configuration Manager (SCM) – Free for clusters of up to 50 nodes
04-66 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-67 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise
In this exercise, you will collaborate with other students to create a ‘real’ Hadoop cluster in the classroom
Please refer to the hands-on exercise manual
04-68 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Deploying Your Cluster
Deployment Types
Installing Hadoop
Using SCM for Easy Installation
Typical Configuration Parameters
Configuring Rack Awareness
Using Configuration Management Tools
Hands-On Exercise: Install A Hadoop Cluster
Conclusion
04-69 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
The different installation configurations available in Hadoop
How to install Hadoop
How to launch the Hadoop daemons
How to configure Hadoop
How to specify your rack topology
05-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 5 Managing and
Scheduling Jobs
05-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
In this chapter, you will learn:
How to stop jobs running on the cluster
The options available for scheduling multiple jobs on the same cluster
The downsides of the default FIFO Scheduler
How to configure the Fair Scheduler
05-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Displaying Running Jobs
To view all jobs running on the cluster, use hadoop job -list
[training@localhost ~]$ hadoop job -list
1 jobs currently running
JobId State StartTime UserName Priority SchedulingInfo
job_201110311158_0008 1 1320210148487 training NORMAL NA
05-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Displaying All Jobs
To display all jobs including completed jobs, use hadoop job -list all
[training@localhost ~]$ hadoop job -list all 7 jobs submitted States are: Running : 1 Succeded : 2 Failed : 3 Prep : 4 JobId State StartTime UserName Priority SchedulingInfo job_201110311158_0004 2 1320177624627 training NORMAL NA job_201110311158_0005 2 1320177864702 training NORMAL NA job_201110311158_0006 2 1320209627260 training NORMAL NA job_201110311158_0007 2 1320210018614 training NORMAL NA job_201110311158_0008 2 1320210148487 training NORMAL NA job_201110311158_0001 2 1320097902546 training NORMAL NA job_201110311158_0003 2 1320099376966 training NORMAL NA
05-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Displaying All Jobs (cont’d)
Note that states are displayed as numeric values – 1: Running – 2: Succeeded – 3: Failed – 4: In preparation – 5: (undocumented) Killed
Easy to write a cron job that periodically lists (for example) all failed jobs, running a command such as – hadoop job -list all | grep '<tab>3<tab>'
05-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Displaying the Status of an Individual Job
hadoop job -status <job_id> provides status about an individual job – Completion percentage – Values of counters
– System counters and user-defined counters
Note: job name is not displayed! – The Web user interface is the most convenient way to view more
details about an individual job – More details later
05-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Killing a Job
It is important to note that once a user has submitted a job, they can not stop it just by hitting CTRL-C on their terminal – This stops job output appearing on the user’s console – The job is still running on the cluster!
05-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Killing a Job (cont’d)
To kill a job use hadoop job -kill <job_id>
[training@localhost ~]$ hadoop job -list 1 jobs currently running JobId State StartTime UserName Priority SchedulingInfo job_201110311158_0009 1 1320210791739 training NORMAL NA
[training@localhost ~]$ hadoop job -kill job_201110311158_0009 Killed job job_201110311158_0009
[training@localhost ~]$ hadoop job -list 0 jobs currently running JobId State StartTime UserName Priority SchedulingInfo
05-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Managing Jobs
In this Hands-On Exercise, you will start and kill jobs from the command line
Please refer to the Hands-On Exercise Manual
05-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Job Scheduling Basics
A Hadoop job is composed of – An unordered set of Map tasks which have locality preferences – An unordered set of Reduce tasks
Tasks are scheduled by the JobTracker – They are then by TaskTrackers – One TaskTracker per node – Each TaskTracker has a fixed number of slots for Map and
Reduce tasks – This may differ per node – a node with a powerful processor
may have more slots than one with a slower CPU – TaskTrackers report the availability of free task slots to the
JobTracker on the Master node
Scheduling a job requires assigning Map and Reduce tasks to available Map and Reduce task slots
05-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The FIFO Scheduler
Default Hadoop job scheduler is FIFO – First In, First Out
Given two jobs A and B, submitted in that order, all Map tasks from job A are scheduled before any Map tasks from job B are considered – Similarly for Reduce tasks
Order of task execution within a job may be shuffled around
A1 A2 A3 A4 B1 B2 B3 …
05-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Priorities in the FIFO Scheduler
The FIFO Scheduler supports assigning priorities to jobs – Priorities are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW – Set with the mapred.job.priority property – May be changed from the command-line as the job is running
– hadoop job -set-priority <job_id> <priority> – All work in each queue is processed before moving on to the next
All higher-priority tasks are run first, if they exist
A1 A2 A3 A4 B1 B2 B3 …
C1 C2 C3 High Priority
Normal Priority
Before any lower-priority tasks are started, regardless of submission order
05-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Priorities in the FIFO Scheduler: Problems
Problem: Job A may have 2,000 tasks; Job B may have 20 – Job B will not make any progress until Job A has nearly finished – Completion time should be proportional to job size
Users with poor understanding of the system may flag all their jobs as HIGH_PRIORITY – Thus starving other jobs of processing time
‘All or nothing’ nature of the scheduler makes sharing a cluster between production jobs with SLAs and interactive users challenging
05-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Goals of the Fair Scheduler
Fair Scheduler is designed to allow multiple users to share the cluster simultaneously
Should allow short interactive jobs to coexist with long production jobs
Should allow resources to be controlled proportionally
Should ensure that the cluster is efficiently utilized
05-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The Fair Scheduler: Basic Concepts
Each job is assigned to a pool – Default assignment is one pool per username
Jobs may be assigned to arbitrarily-named pools – Such as “production”
Physical slots are not bound to any specific pool
Each pool gets an even share of the available task slots
05-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Pool Creation
By default, pools are created dynamically based on the username submitting the job – No configuration necessary
Jobs can be sent to designated pools (e.g., “production”) – Pools can be defined in a configuration file (see later) – Pools may have a minimum number of mappers and reducers
defined
05-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Adding Pools Readjusts the Share of Slots
If Charlie now submits a job in a new pool, shares of slots are adjusted
05-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Determining the Fair Share
The fair share of tasks slots assigned to the pool is based on: – The actual number of task slots available across the cluster – The demand from the pool
– The number of tasks eligible to run – The minimum share, if any, configured for the pool – The fair share of each other active pool
The fair share for a pool will never be higher than the actual demand
Pools are filled up to their minimum share, assuming cluster capacity
Excess cluster capacity is spread across all pools – Aim is to maintain the most even loading possible
05-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example Minimum Share Allocation
First, fill Production up to 20 slot minimum guarantee
Then distribute remaining 10 slots evenly across Alice and Bob
05-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example Allocation 2: Production Queue Empty
Production has no demand, so no slots reserved
All slots allocated evenly across Alice and Bob
05-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example Allocation 3: MinShares Exceed Slots
minShare of Production, Research exceeds available capacity
minShares are scaled down pro rata to match actual slots
No slots remain for users without minShare (i.e., Bob)
05-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example 4: minShare < Fair Share
Production filled to minShare
Remaining 25 slots distributed across all pools
Production pool gets more than minShare, to maintain fairness
05-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Pools With Weights
Instead of (or in addition to) setting minShare, pools can be assigned a weight
Pools with higher weight get more slots during free slot allocation
‘Even water glass height’ analogy: – Think of the weight as controlling the ‘width’ of the glass
05-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Pool With Double Weight
Production filled to minShare (5)
Remaining 25 slots distributed across pools
Bob’s pool gets two slots instead of one during each round
05-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Multiple Jobs Within A Pool
A pool exists if it has one or more jobs in it
So far, we’ve only described how slots are assigned to pools
We need to determine how jobs are scheduled within a given pool
05-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Job Scheduling Within a Pool
Within a pool, resources are fair-scheduled across all jobs – This is achieved via another instance of Fair Scheduler
It is possible to enforce FIFO scheduling within a pool – May be appropriate for jobs that would compete for external
bandwidth, for example
Pools can have a maximum number of concurrent jobs configured
The weight of a job within a pool is determined by its priority (NORMAL, HIGH etc)
05-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Preemption in the Fair Scheduler
If shares are imbalanced, pools which are over their fair share may not assign new tasks when their old ones complete – Eventually, as tasks complete, free slots will become available – Those free slots will be used by pools which were under their fair
share – This may not be acceptable in a production environment, where
tasks take a long time to complete
Two types of preemption are supported – minShare preemption – Fair Share preemption
05-32 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
minShare Preemption
Pools with a minimum share configured are operating on an SLA (Service Level Agreement) – Waiting for tasks from other pools to finish may not be
appropriate
Pools which are below their minimum guaranteed share can kill the newest tasks from other pools to reap slots – Can then use those slots for their own tasks – Ensures that the minimum share will be delivered within a timeout
window
05-33 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Fair Share Preemption
Pools not receiving their fair share can kill tasks from other pools – A pool will kill the newest task(s) in an over-share pool to forcibly
make room for starved pools
Fair share preemption is used conservatively – A pool must be operating at less than 50% of its fair share for 10
minutes before it can preempt tasks from other pools
05-34 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-35 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Steps to Configure the Fair Scheduler
1. Enable the Fair Scheduler
2. Configure Scheduler parameters
3. Configure pools
05-36 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
1. Enable the Fair Scheduler
In mapred-site.xml on the JobTracker, specify the scheduler to use: <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.apache.hadoop.mapred.FairScheduler</value> </property>
Identify the pool configuration file: <property> <name>mapred.fairscheduler.allocation.file</name> <value>/etc/hadoop/conf/allocations.xml</value> </property>
05-37 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Scheduler Parameters in mapred-site.xml
mapred.fairscheduler.poolnameproperty Specifies which JobConf property is used to determine the pool that a job belongs in. Default is user.name (i.e., one pool per user). Other options include group.name, mapred.job.queue.name
mapred.fairscheduler.sizebasedweight Makes a pool’s weight proportional to log(demand) of the pool. Default: false.
mapred.fairscheduler.weightadjuster Specifies a WeightAdjuster implementation that tunes job weights dynamically. Default is blank; can be set to org.apache.hadoop.mapred.NewJobWeightBooster.
05-38 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Configuring Pools
The allocations configuration file must exist, and contain an <allocations> entity
<pool> entities can contain minMaps, minReduces, maxRunningJobs, weight
<user> entities (optional) can contain maxRunningJobs – Limits the number of simultaneous jobs a user can run
userMaxJobsDefault entity (optional) – Maximum number of jobs for any user without a specified limit
System-wide and per-pool timeouts can be set
05-39 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Very Basic Pool Configuration
The allocations configuration file must exist, and contain at least this:
<?xml version="1.0"?>
<allocations>
</allocations>
05-40 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Limit Users to Three Jobs Each
Limit max jobs for any user: specify userMaxJobsDefault
<?xml version="1.0"?>
<allocations>
<userMaxJobsDefault>3</userMaxJobsDefault>
</allocations>
05-41 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Allow One User More Jobs
If a user needs more than the standard maximum number of jobs, create a <user> entity
<?xml version="1.0"?>
<allocations>
<userMaxJobsDefault>3</userMaxJobsDefault>
<user name="bob">
<maxRunningJobs>6</maxRunningJobs>
</user>
</allocations>
05-42 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Add a Fair Share Timeout
Set a Preemption timeout
<?xml version="1.0"?>
<allocations>
<userMaxJobsDefault>3</userMaxJobsDefault>
<user name="bob">
<maxRunningJobs>6</maxRunningJobs>
</user>
<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>
</allocations>
05-43 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Create a ‘production’ Pool
Pools are created by adding <pool> entities
<?xml version="1.0"?>
<allocations>
<userMaxJobsDefault>3</userMaxJobsDefault>
<pool name="production">
<minMaps>20</minMaps>
<minReduces>5</minReduces>
<weight>2.0</weight>
</pool>
</allocations>
05-44 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Add an SLA to the Pool
<?xml version="1.0"?>
<allocations>
<userMaxJobsDefault>3</userMaxJobsDefault>
<pool name="production">
<minMaps>20</minMaps>
<minReduces>5</minReduces>
<weight>2.0</weight>
<minSharePreemptionTimeout>60</minSharePreemptionTimeout>
</pool>
</allocations>
05-45 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example: Create a FIFO Pool
FIFO pools are useful for jobs which are, for example, bandwidth-intensive
Note: <schedulingMode>FAIR</schedulingMode> would use the Fair Scheduler
<?xml version="1.0"?>
<allocations>
<pool name="bandwidth_intensive">
<minMaps>10</minMaps>
<minReduces>5</minReduces>
<schedulingMode>FIFO</schedulingMode>
</pool>
</allocations>
05-46 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Monitoring Pools and Allocations
The Fair Scheduler exposes a status page in the JobTracker Web user interface at
http://<job_tracker_host>:50030/scheduler – Allows you to inspect pools and allocations
Any changes to the pool configuration file (e.g., allocations.xml) will automatically be reloaded by the running scheduler – Scheduler detects a timestamp change on the file
– Waits five seconds after the change was detected, then reloads the file
– If the scheduler cannot parse the XML in the configuration file, it will log a warning and continue to use the previous configuration
05-47 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-48 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Using The Fair Scheduler
In this Hands-On Exercise, you will run jobs in different pools
Please refer to the Hands-On Exercise manual
05-49 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Managing and Scheduling Jobs
Managing Running Jobs
Hands-On Exercise: Managing Jobs
The FIFO Scheduler
The Fair Scheduler
Configuring the Fair Scheduler
Hands-On Exercise: Using the Fair Scheduler
Conclusion
05-50 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
How to stop jobs running on the cluster
The options available for scheduling multiple jobs on the same cluster
The downsides of the default FIFO Scheduler
How to configure the Fair Scheduler
06-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 6 Cluster Maintenance
06-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
In this chapter, you will learn:
How to check the status of HDFS
How to copy data between clusters
How to add and remove nodes
How to rebalance the cluster
The purpose of the Secondary NameNode
What strategies to employ for NameNode Metadata backup
How to upgrade your cluster
06-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Checking for Corruption in HDFS
fsck checks for missing or corrupt data blocks – Unlike system fsck, does not attempt to repair errors
Can be configured to list all files – Also all blocks for each file, all block locations, all racks
Examples: hadoop fsck / hadoop fsck / -files hadoop fsck / -files -blocks hadoop fsck / -files -blocks -locations hadoop fsck / -files -blocks -locations -racks
06-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Checking for Corruption in HDFS (cont’d)
Good idea to run fsck as a regular cron job that e-mails the results to administrators – Choose a low-usage time to run the check
-move option moves corrupted files to /lost+found – A corrupted file is one where all replicas of a block are missing
-delete option deletes corrupted files
06-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using dfsadmin
dfsadmin provides a number of administrative features including:
List information about HDFS on a per-datanode basis
Re-read the dfs.hosts and dfs.hosts.exclude files
$ hadoop dfsadmin -report
$ hadoop dfsadmin -refreshNodes
06-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using dfsadmin (cont’d)
Manually set the filesystem to ‘safe mode’ – NameNode starts up in safe mode
– Read-only – no changes can be made to the metadata – Does not replicate or delete blocks – Leaves safe mode when the (configured) minimum
percentage of blocks satisfy the minimum replication condition
– Can also block until safemode is exited – Useful for shell scripts – hadoop dfsadmin -safemode wait
$ hadoop dfsadmin –safemode enter
$ hadoop dfsadmin –safemode leave
06-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using dfsadmin (cont’d)
Saves the NameNode metadata to disk and resets the edit log – Must be in safe mode
$ hadoop dfsadmin –saveNamespace
06-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Breaking the Cluster
In this hands-on exercise, you will introduce some problems into the cluster
Please refer to the Hands-On Exercise Manual
06-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Copying Data
Hadoop clusters can hold massive amounts of data
A frequent requirement is to back up the cluster for disaster recovery
Ultimately, this is not a Hadoop problem! – It’s a ‘managing huge amounts of data’ problem
Cluster could be backed up to tape etc if necessary – Custom software may be needed
06-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Copying Data with distcp
distcp copies data within a cluster, or between clusters – Used to copy large amounts of data – Turns the copy procedure into a MapReduce job
Syntax: – hadoop distcp hdfs://nn1:8020/path/to/src \ hdfs://nn2:8020/path/to/dest
hdfs:// and port portions are optional if source and destination are on the local cluster
Copies files or entire directories – Files previously copied will be skipped
– Note that the only check for duplicate files is that the file’s name and size are identical
06-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Copying Data: Best Practices
In practice, many organizations do not copy data between clusters
Instead, they write their data to two clusters as it is being imported – This is often more efficient – Not necessary to run all MapReduce jobs on the backup cluster – As long as the source data is available, all derived data can be
regenerated later
06-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Adding Cluster Nodes
To add nodes to the cluster: 1. Add the names of the nodes to the ‘include’ file(s), if you are
using this method to explicitly list allowed nodes – The file(s) referred to by dfs.hosts (and mapred.hosts
if that has been used) 2. Update your rack awareness script with the new information 3. Update the NameNode with this new information
– hadoop dfsadmin –refreshNodes 4. Start the new DataNode and TaskTracker instances 5. Restart the JobTracker (if you have changed mapred.hosts)
– There is currently no way to refresh a running JobTracker 6. Check that the new DataNodes and TaskTrackers appear in the
Web UI
06-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Adding Nodes: Points to Note
The NameNode will not ‘favor’ a new node added to the cluster – It will not prefer to write blocks to the node rather than to other
nodes
This is by design – The assumption is that new data is more likely to be processed
by MapReduce jobs – If all new blocks were written to the new node, this would impact
data locality for MapReduce jobs
06-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Removing Cluster Nodes
To remove nodes from the cluster: 1. Add the names of the nodes to the ‘exclude’ file(s)
– The file(s) referred to by dfs.hosts.exclude (and mapred.hosts.exclude if that has been used)
2. Update the NameNode with the new set of DataNodes – hadoop dfsadmin -refreshNodes – The NameNode UI will show the admin state change to
‘Decommission In Progress’ for affected DataNodes – When all DataNodes report their state as
‘Decommissioned’, all the blocks will have been replicated elsewhere
3. Shut down the decommissioned nodes 4. Remove the nodes from the ‘include’ and ‘exclude’ files and
update the NameNode as above
06-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Rebalancing
An HDFS cluster can become ‘unbalanced’ – Some Nodes have much more data on them than others – Example: add a new Node to the cluster
– Even after adding some files to HDFS, this Node will have far less data than the others
– During MapReduce processing, this Node will use much more network bandwidth as it retrieves data from other Nodes
Clusters can be rebalanced using the balancer utility
06-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using balancer
balancer reviews data block placement on nodes and adjusts blocks to ensure all nodes are within x% utilization of each other
Utilization is defined as amount of data storage used
x is known as the threshold
A node is under-utilized if its utilization is less than (average utilization - threshold)
A node is over-utilized if its utilization is more than (average utilization + threshold)
Note: balancer does not consider block placement on individual disks on a node – Only the utilization of the node as a whole
06-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using balancer (cont’d)
Syntax: – hadoop balancer -threshold x
Threshold is optional – Defaults to 10 (i.e., 10% difference in utilization between nodes)
Rebalancing can be canceled at any time – Interrupt the command with Ctrl-C
Bandwidth usage can be controlled by setting the property dfs.balance.bandwidthPerSec in hdfs-site.xml – Specifies a bandwidth in bytes/sec that each DataNode can use
for rebalancing – Default is 1048576 (1MB/sec) – Recommendation: approx. 0.1 x network speed
– e.g., for a 1Gbps network, 10MB/sec
06-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
When To Rebalance
Cluster should become unbalanced during regular usage
Rebalance immediately after adding new nodes to the cluster
Rebalancing does not interfere with any existing MapReduce jobs – However, it does use bandwidth – Not a good idea to rebalance during peak usage times
06-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
In this Hands-On Exercise, you will verify that the cluster has recovered from the problems you introduced in the last exercise
You will also cause the cluster some other problems, and observe what happens
Please refer to the Hands-On Exercise Manual
06-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HDFS Replicates Data By Default
Recall that HDFS replicates blocks of data on a cluster – Default is three-fold replication
This means it is highly unlikely that data will be lost as a result of an individual node failing
However, metadata from the NameNode must be backed up to avoid disaster should the NameNode fail
06-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The NameNode’s Filesystem
The NameNode’s directory structure looks like this: ${dfs.name.dir}/current/VERSION /edits /fsimage /fstime
VERSION is a Java properties file containing information about the version of HDFS that is running
fstime is a record of the time the last checkpoint was taken (covered later) – Stored in Hadoop’s Writable serializable format
06-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Backing Up the NameNode
Recall that dfs.name.dir is a comma-separated list of directories – Writes go to all directories in the list
Recommendation: write to two local directories on different physical volumes, and to an NFS-mounted directory – Data will be preserved even in the event of a total failure of the
NameNode machines
Recommendation: soft-mount the NFS directory – If the NFS mount goes offline, this will not cause the NameNode
to fail
06-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
fsimage and the Secondary NameNode
The fsimage file also contains the filesystem metadata – It is not updated at every write
– This would be very slow
When an HDFS client performs a write operation, it is recorded in the Primary NameNode’s edit log – The edits file – The NameNode’s in-memory representation of the filesystem
metadata is also updated
System resilience is not compromised, since recovery can be performed by loading the fsimage file and applying all the changes in the edits file to that information – Does not record the datanodes on which blocks are stored
– This is reported to the NameNode by the DataNodes when they join the cluster
06-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
fsimage and the Secondary NameNode (cont’d)
Applying all changes in the edits file could take a long time – The file would also grow to be huge
The Secondary NameNode periodically checkpoints the NameNode’s in-memory filesystem data
1. Tells the NameNode to roll its edits file 2. Retrieves fsimage and edits from the NameNode 3. Loads fsimage into memory and applies the changes from the
edits file 4. Creates a new, consolidated fsimage file 5. Sends the new fsimage file back to the primary NameNode 6. The NameNode replaces the old fsimage file with the new
one, replaces the old edits file with the new one it created in step 1, and updates the fstime file to record the checkpoint time
06-32 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
fsimage and the Secondary NameNode (cont’d)
This checkpointing operation is performed every hour – Configured by fs.checkpoint.period
Checkpointing will also occur if the edit log reaches 64MB – Configured by fs.checkpoint.size, in bytes – Secondary NameNode checks this size every five minutes
This determines the worst-case amount of data loss should the primary NameNode crash
Note: the Secondary NameNode is not a live backup of the primary NameNode! – Hadoop 0.21 renamed the Secondary NameNode as the
Checkpoint Node
06-33 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Manually Backing Up fsimage and edits
You can retrieve copies of the fsimage and edits files from the NameNode at any time via HTTP – fsimage: http://<namenode>:50070/getimage?getimage=1 – edits: http://<namenode>:50070/getimage?getedit=1
Note: fsimage is a copy of the NameNode’s fsimage file, not the in-memory version of the metadata
It is good practice to regularly retrieve these files for offsite backup – Typically done using a shell script and the curl utility or similar
The command hadoop dfsadmin -saveNamespace will force the NameNode to write its in-memory Metadata as a new fsimage file – Replaces the old fsimage and edits files – NameNode must be in safe mode to do this
06-34 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Recovering the NameNode
To recover from a NameNode failure, or to restore from a backup: 1. Stop the NameNode 2. Remove existing copies of fsimage and edits from all
directories in the dfs.name.dir list 3. Place the recovery versions of fsimage and edits in the
appropriate directory 4. Ensure the recovery versions are owned by hdfs:hdfs 5. Start the NameNode
Note: Step 2 is crucial. Otherwise, the NameNode will copy the first valid fsimage and edits files it finds into the other directories in the list – Potentially overwriting your recovery versions
06-35 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-36 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Upgrading Software: When to Upgrade?
Cloudera provides ‘production’ and ‘beta’ releases
‘Production’: – Ready for production clusters – Passed all unit tests, functional tests – Has been tested on large clusters over a significant period
‘Beta’: – Recommended for people who want more features – Passed unit tests, functional tests – Do not have the same ‘soak time’ as our Production packages – A work in progress that will eventually be promoted to Production
Cloudera supports a Production release for at least one year after a subsequent release is available
06-37 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Upgrading Software: Procedures
Software upgrade procedure is fully documented on the Cloudera Web site
General steps: 1. Stop the MapReduce cluster 2. Stop HDFS cluster 3. Install the new version of Hadoop 4. Start the NameNode with the -upgrade option 5. Monitor the HDFS cluster until it reports that the upgrade is
complete 6. Start the MapReduce cluster
Time taken to upgrade HDFS depends primarily on the number of blocks per datanode – In general, 20-30 blocks per second on each node, depending on
hardware
06-38 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Upgrading Software: Procedures (cont’d)
Once the upgraded cluster has been running for a few days with no problems, finalize the upgrade by running hadoop dfsadmin -finalizeUpgrade – DataNodes delete their previous version working directories, then
the NameNode does the same
If you encounter problems, you can roll back an (unfinalized) upgrade by stopping the cluster, then starting the old version of HDFS with the -rollback option
Note that this upgrade procedure is required when HDFS data structures or RPC communication format change – For example, from CDH3 to CDH4
Probably not required for minor version changes – But see the documentation for definitive information!
06-39 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Maintenance
Checking HDFS status
Hands-On Exercise: Breaking The Cluster
Copying Data Between Clusters
Adding and Removing Cluster Nodes
Rebalancing The Cluster
Hands-On Exercise: Verifying The Cluster’s Self-Healing Features
NameNode Metadata Backup
Cluster Upgrading
Conclusion
06-40 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
How to check the status of HDFS
How to copy data between clusters
How to add and remove nodes
How to rebalance the cluster
The purpose of the Secondary NameNode
What strategies to employ for NameNode Metadata backup
How to upgrade your cluster
07-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 7 Cluster Monitoring and
Troubleshooting
07-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
In this chapter, you will learn:
What general system conditions to monitor
How to use the NameNode and JobTracker Web UIs
How to view and manage Hadoop’s log files
How the Ganglia monitoring tool works
Some common cluster problems, and their resolutions
How to benchmark your cluster’s performance
07-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
General System Monitoring
Later in this chapter you will see how to use Ganglia to monitor your cluster
You should also use a general system monitoring tool to warn you of potential or actual problems on individual machines in the cluster
Many such tools exist, including – Nagios – Cacti – Hyperic – Zabbix
We do not have a specific recommendation – Use the tools with which you are most familiar
Here we present a list of items to monitor
07-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Items to Monitor
Monitor the Hadoop daemons – Alert an operator if a daemon goes down – Check can be done with service hadoop-0.20-daemon_name status
Monitor disks and disk partitions – Alert immediately if a disk fails – Send a warning when a disk reaches 80% capacity – Send a critical alert when a disk reaches 90% capacity
Monitor CPU usage on master nodes – Send an alert on excessive CPU usage – Slave nodes will often reach 100% usage
– This is not a problem
07-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Items to Monitor (cont’d)
Monitor swap on all nodes – Alert if the swap partition starts to be used
– Memory allocation is overcommitted
Monitor network transfer speeds
Ensure that the Secondary NameNode checkpoints regularly – Check the age of the fsimage file and/or check the size of the edits file
07-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Logging Basics
Hadoop logs are stored in $HADOOP_INSTALL/logs by default – Can be changed by setting the HADOOP_LOG_DIR environment
variable in hadoop-env.sh – Typically set to /var/log/hadoop/
Each Hadoop daemon writes two logfiles – *.log file written via log4j
– Standard log4j configuration rotates logfiles daily – Old logfiles are not deleted (or gzipped) by default – First port of call when diagnosing problems
– *.out file – Combination of stdout and stderr during daemon startup – Doesn’t usually contain much output – Rotated when daemon restarts, five files retained
07-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Logging Basics (cont’d)
Log file names: hadoop-<user-running-hadoop>-<daemon>-<hostname>.{log|out}
– Example: hadoop-hadoop-datanode-r2n13.log
Configuration for log4j is at conf/log4j.properties
Log file growth: – Slow when cluster is idle – Can be very rapid when jobs are running – Monitor log directory to avoid out-of-space errors since old .log
files are not deleted by default
07-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
log4j Configuration
Log4j configuration is controlled by conf/log4j.properties
Default log level configured by hadoop.root.logger – Default is INFO
Log level can be set for any specific class with log4j.logger.class.name = LEVEL – Example:
log4j.logger.org.apache.hadoop.mapred.JobTracker=INFO
Valid log levels: – FATAL, ERROR, WARN, INFO, DEBUG, TRACE
07-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The DailyRollingFileAppender
An Appender is the destination for log messages
Hadoop’s default for daemon logs is the DailyRollingFileAppender (DRFA) – Rotates logfiles daily
– Frequency is configurable – Cannot limit filesize – Cannot limit the number of files kept
You must provide your own scripts to compress, archive, delete logs
DRFA is the most popular choice for Hadoop logs – It is the default, and many system administrators are not familiar
with Java logging
07-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Alternative Appender: RollingFileAppender
RollingFileAppender (RFA) is also available in CDH – Despite its name, not a superclass of DRFA – Lets you specify the maximum size of generated log files – Lets you set the number of files retained
To use RFA: – Edit $HADOOP_HOME/bin/hadoop-daemon.sh
– change export HADOOP_ROOT_LOGGER="INFO,DRFA" to export HADOOP_ROOT_LOGGER="INFO,RFA"
07-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
An Alternative Appender: RollingFileAppender (cont’d)
To configure, edit /etc/hadoop/conf/log4j.properties
Uncomment the lines under # # Rolling File Appender # (except for the comment line # Log file size
Edit to suit; in particular: log4j.appender.RFA.MaxFileSize=100MB log4j.appender.RFA.MaxBackupIndex=30
07-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Job Logs: Created By Hadoop
When a job runs, two files are created: – The job configuration XML file
– Contains job configuration settings specified by the developer – The job status file
– Constantly updated as the job runs – Includes counters, task status information etc
These files are stored in multiple places:
07-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Locations of Job Log Files
${hadoop.log.dir}/<job_id>_conf.xml (Job configuration XML only) – Retained for mapred.jobtracker.retirejob.interval
milliseconds – Default is one day (24 * 60 * 60 * 1000)
hadoop.job.history.location – Default is ${hadoop.log.dir}/history – Retained for 30 days
hadoop.job.history.user.location – Default is <job_output_dir_in_hdfs>/_logs/history – Retained forever
In addition, the JobTracker keeps the data in memory for a limited time
07-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Developer Log Files
Caution: inexperienced developers will often create large log files from their jobs – Data written to stdout/stderr – Data written using log4j from within the code
Large developer logfiles can run your slave nodes out of disk space
Developer logs are stored in ${hadoop.log.dir}/userlogs – This is hardcoded – Ensure you have enough room on the partition for logs
Developer logs are deleted according to mapred.userlog.retain.hours – Default is 24 hours
07-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Common Hadoop Ports
Hadoop daemons each provide a Web-based User Interface – Useful for both users and system administrators
Expose information on a variety of different ports – Port numbers are configurable, although there are defaults for
most
Hadoop also uses various ports for components of the system to communicate with each other
07-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Ports for Administrators
Daemon Default+Port
Configura1on+Parameter Used+for
NameNode 8020 fs.default.name Filesystem metadata operations
DataNode 50010 dfs.datanode.address DFS data transfer
DataNode 50075 dfs.datanode.ipc.address
Block metadata operations and recovery
BackupNode 50100 dfs.backup.address HDFS metadata operations (from Hadoop 0.21)
JobTracker Usually 8021, 9001, or 8012
mapred.job.tracker Job submission, task tracker heartbeats
TaskTracker Usually 8021, 9001, or 8012
mapred.task.tracker.report.address
Communicating with child jobs
07-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Web UI Ports for Users
Daemon Default Port Configuration parameter
HD
FS NameNode 50070 dfs.http.address
DataNode 50075 dfs.datanode.http.address
Secondary NameNode 50090 dfs.secondary.http.address
Backup/Checkpoint Node* 50105 dfs.backup.http.address
MR
JobTracker 50030 mapred.job.tracker.http.address
TaskTracker 50060 mapred.task.tracker.http.address
* From Hadoop 0.21 onwards
07-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
The JobTracker Web UI
JobTracker exposes its Web UI on port 50030
07-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Drilling Down to Individual Jobs
Clicking on an individual job name will reveal more information about that job
07-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Stopping MapReduce Jobs From the Web UI
By default, the JobTracker Web UI is read-only – Job information is displayed, but the job cannot be controlled in
any way
It is possible to set the UI to allow jobs, or individual Map or Reduce tasks, to be killed – Add the following property to core-site.xml
– Restart the JobTracker
<property>
<name>webinterface.private.actions</name>
<value>true</value>
</property>
07-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Stopping Jobs From the Web UI (cont’d)
The Web UI will now include an ‘actions’ column for each task – And an overall option to kill entire jobs
07-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Stopping Jobs From the Web UI (cont’d)
Caution: anyone with access to the Web UI can now manipulate running jobs! – Best practice: make this available only to administrative users
Better to use the command-line to stop jobs – Discussed earlier in the course
07-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-27 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Examining the Web UI
In this brief Hands-On Exercise, you will examine the NameNode Web UI (at http://<namenode_location>:50070/) and JobTracker Web UI (at http://<jobtracker_location>:50030/)
From the command line, run a Hadoop job – Example: hadoop jar /usr/lib/hadoop/hadoop-examples.jar \ sleep -m 10 -r 10 -mt 10000 -rt 10000
Open the JobTracker Web UI and view the progress of the Mappers and Reducers
Investigate the NameNode’s Web UI
07-28 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-29 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Metrics
Hadoop can be configured to log many different metrics
Metrics are grouped into ‘contexts’ – jvm
– Statistics from the JVM including memory usage, thread counts, garbage collection information
– All Hadoop daemons use this context – dfs
– NameNode capacity, number of files, under-replicated blocks – mapred
– JobTracker information, similar to that found on the JobTracker’s Web status page
– rpc – For Remote Procedure Calls
07-30 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Metrics (cont’d)
Configure in conf/hadoop-metrics.properties
Example to log metrics to files:
# Configuration of the dfs context for file dfs.class=org.apache.hadoop.metrics.file.FileContext dfs.period=10 # You’ll want to change the path dfs.fileName=/tmp/dfsmetrics.log
# Configuration of the mapred context for file mapred.class=org.apache.hadoop.metrics.file.FileContext mapred.period=10 mapred.fileName=/tmp/mrmetrics.log
# Configuration of the jvm context for file jvm.class=org.apache.hadoop.metrics.file.FileContext jvm.period=10 jvm.fileName=/tmp/jvmmetrics.log
# Configuration of the rpc context for file rpc.class=org.apache.hadoop.metrics.file.FileContext rpc.period=10 rpc.fileName=/tmp/rpcmetrics.log
07-31 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Metrics (cont’d)
Example dfs metrics:
dfs.datanode: hostName=doorstop.local, sessionId=, blockReports_avg_time=0, blockReports_num_ops=1, block_verification_failures=0, blocks_read=0, blocks_removed=0, blocks_replicated=0, blocks_verified=0, blocks_written=44, bytes_written=64223, copyBlockOp_avg_time=0, copyBlockOp_num_ops=0, heartBeats_avg_time=1, heartBeats_num_ops=7, readBlockOp_avg_time=0, readBlockOp_num_ops=0, readMetadataOp_avg_time=0, readMetadataOp_num_ops=0, reads_from_local_client=0, reads_from_remote_client=0, replaceBlockOp_avg_time=0, replaceBlockOp_num_ops=0, writeBlockOp_avg_time=5, writeBlockOp_num_ops=44, writes_from_local_client=44, writes_from_remote_client=0dfs.namenode: hostName=doorstop.local, sessionId=, AddBlockOps=44, CreateFileOps=44, DeleteFileOps=0, FilesCreated=59, FilesRenamed=0, GetBlockLocations=0, GetListingOps=1, SafemodeTime=102, Syncs_avg_time=0, Syncs_num_ops=100, Transactions_avg_time=0, Transactions_num_ops=148, blockReport_avg_time=0, blockReport_num_ops=1, fsImageLoadTime=98dfs.FSNamesystem: hostName=doorstop.local, sessionId=, BlocksTotal=44, CapacityRemainingGB=78, CapacityTotalGB=201, CapacityUsedGB=0, FilesTotal=60, PendingReplicationBlocks=0, ScheduledReplicationBlocks=0, TotalLoad=1, UnderReplicatedBlocks=44
07-32 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hadoop Metrics (cont’d)
Metrics are also exposed via the Web interface at http://<namenode_address>:50070/metrics
07-33 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Monitoring Challenges
System monitoring becomes a challenge when dealing with large numbers of systems
Multiple solutions exist, such as – Nagios – Hyperic – Zabbix
Many of these are very ‘general purpose’ – Fine for monitoring the machines themselves – Not so useful for integrating with Hadoop
07-34 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Monitoring Cluster Metrics with Ganglia
Ganglia is an open-source, scalable, distributed monitoring product for high-performance computing systems – Specifically designed for clusters of machines
Collects, aggregates, and provides time-series views of metrics
Integrates with Hadoop’s metrics-collection system
Note: Ganglia doesn’t provide alerts
07-35 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Ganglia Network Architecture
Data$Collector:$$GMOND+
Cluster+node+
Data$Collector:$$GMOND+
Cluster+node+
Data$Collector:$$GMOND+
Cluster+node+
Web+Server+
Database:$rrdtool+
Data$Consolidator:$$GMETAD+
Apache$+$PHP$scripts$
07-36 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Example Ganglia Web App Output
07-37 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Ganglia Configuration
Install the GMOND daemon on every cluster Node
Make sure port 8649 is open for both UDP and TCP connections
Install GMETAD on a Web server
Configure Hadoop to publish metrics to Ganglia in conf/hadoop-metrics.properties
Example:
# Configuration of the "dfs" context for ganglia dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext dfs.period=10 dfs.servers=127.0.0.1:8649
07-38 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Ganglia Versions
Ganglia 3.0.x and 3.1 both work well with Hadoop out-of-the-box – Ganglia 3.1 also works out-of-the-box with CDH
Ganglia 3.1 uses org.apache.hadoop.metrics.ganglia.GangliaContext31
07-39 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-40 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
General Troubleshooting Issues: Introduction
On the next few slides you will find some common problems exhibited by clusters, and suggested solutions
Note that these are just some of the issues you could run in to on a cluster
Also note that these are possible causes and resolutions – The problems could equally be caused by many other issues
07-41 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Map/Reduce Task Out Of Memory Error
Possible causes – Map or Reduce task has run out of memory – Possibly due to a memory leak in the job code
Possible resolution – Increase size of RAM allocated in mapred.java.child.opts – Ensure io.sort.mb is smaller than RAM allocated in mapred.java.child.opts
FATAL org.apache.hadoop.mapred.TaskTracker: Error running child : java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:781) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:350) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170)
07-42 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
JobTracker Out Of Memory Error
Cause: JobTracker has exceeded allocated memory
Possible resolutions – Increase JobTracker’s memory allocation – Reduce mapred.jobtracker.completeuserjobs.maximum
– Amount of job history held in JobTracker’s RAM
ERROR org.apache.hadoop.mapred.JobTracker: Job initialization failed: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.TaskInProgress.<init>(TaskInProgress.java:122) at org.apache.hadoop.mapred.JobInProgress.initTasks(JobInProgress.java:653) at org.apache.hadoop.mapred.JobTracker.initJob(JobTracker.java:3965) at org.apache.hadoop.mapred.EagerTaskInitializationListener$InitJob.run(EagerTaskInitializationListener.java:79) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)
07-43 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Too Many Fetch Failures
Cause – Reducers are failing to fetch intermediate data from a
TaskTracker where a Map process ran – Too many of these failures will cause a TaskTracker to be
blacklisted
Possible resolutions – Increase tasktracker.http.threads – Decrease mapred.reduce.parallel.copies – Upgrade to CDH3u2
– The version of Jetty (the Web server) in earlier versions of the TaskTracker was prone to fetch failures
INFO org.apache.hadoop.mapred.JobInProgress: Too many fetch-failures for output of task:
07-44 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Not Able To Place Enough Replicas
Possible causes – Fewer DataNodes available than the replication factor of the
blocks – DataNodes do not have enough xciever threads
– Default is 256 threads to manage connections – Note: yes, the configuration option is misspelled!
Possible resolutions – Increase dfs.datanode.max.xcievers to 4096 – Check replication factor
WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Not able to place enough replicas
07-45 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-46 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Why Benchmark?
Common question after adding new nodes to a cluster: – “How much faster is my cluster running now?”
Benchmarking clusters is not an exact science – Performance depends on the type of job you’re running
‘Standard’ benchmark is Terasort – Example: Generate a 10,000,000-line file, each line containing
100 bytes, then sort that file
This is predominantly testing network and disk I/O performance
hadoop jar $HADOOP_HOME/hadoop-*-examples.jar teragen 10000000 input_dir
hadoop jar $HADOOP_HOME/hadoop-*-examples.jar terasort input-dir output-dir
07-47 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Real-World Benchmarks
Test your cluster before and after adding nodes
Remember to take into account other jobs running on the nodes while you’re benchmarking!
As a (very high-end!) guide: in April 2009, Arun Murthy and Owen O’Malley at Yahoo! sorted a terabyte of data in 62 seconds on a cluster of 1,406 nodes – Albeit using a somewhat modified version of Hadoop
07-48 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Cluster Monitoring and Troubleshooting
General System Monitoring
Managing Hadoop’s Log Files
Using the NameNode and JobTracker Web UI
Hands-On Exercise: Examining the Web UI
Cluster Monitoring with Ganglia
Common Troubleshooting Issues
Benchmarking Your Cluster
Conclusion
07-49 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
What general system conditions to monitor
How to use the NameNode and JobTracker Web UIs
How to view and manage Hadoop’s log files
How the Ganglia monitoring tool works
Some common cluster problems, and their resolutions
How to benchmark your cluster’s performance
08-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 8 Populating HDFS From
External Sources
08-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
In this chapter, you will learn:
What Flume is
How Flume works
What Sqoop is
How to use Sqoop to import data from RDBMSs to HDFS
Best practices for importing data
Note: In this chapter we can only provide a brief overview of Flume and Sqoop; consult the
documentation at http://archive.cloudera.com/docs/
for full details on installation and configuration
08-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
An Overview of Flume
Hands-On Exercise: Using Flume
An Overview of Sqoop
Best Practices for Importing Data
Conclusion
08-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Is Flume?
Flume is a distributed, reliable, available service for efficiently moving large amounts of data as it is produced – Ideally suited to gathering logs from multiple systems and
inserting them into HDFS as they are generated
Developed in-house by Cloudera, and released as open-source software
Design goals: – Reliability – Scalability – Manageability – Extensibility
08-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume: High-Level Overview
Agent&& Agent& Agent&
Processor& Processor&
Collector(s)&
Agent&
• Mul$ple'configurable'levels'of'reliability'
• Agents''can'guarantee'delivery'in'event'of'failure'
• Op$onally'deployable,'centrally'administered'
compress&
encrypt&
batch&
encrypt&
• Flexibly'deploy'decorators'at'any'step'to'improve'performance,''reliability'or'security'
• Op$onally'pre=process'incoming'data:'perform'transforma$ons,'suppressions,'metadata'enrichment'
• Writes'to'mul$ple'HDFS'file'formats'(text,'SequenceFile,'JSON,'Avro,'others)'
• Parallelized'writes'across'many'collectors'–'as'much'write'throughput'as'required'
MASTER&
• Master'communicates'with'all'Agents,'specifying'configura$on'etc.'
08-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume’s Design Goals: Reliability
Flume is designed to continue delivering events in the face of system component failure
Provides configurable reliability guarantees – End-to-end
– Once Flume acknowledges receipt of an event, the event will eventually make it to the end of the pipeline
– Store on failure – Nodes require acknowledgment of receipt from the node one
hop downstream – Best effort
– No attempt is made to confirm receipt of data from the node downstream
08-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume’s Design Goals: Scalability
Flume scales horizontally – As load increases, more machines can be added to the
configuration – ‘Aggregator’ nodes can be configured to receive data from
multiple upstream nodes and then pass them on down the chain
08-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume’s Design Goals: Manageability
Flume provides a central Master controller – System Administrators can monitor data flows and reconfigure
them on the fly – Via a Web interface or a scriptable command-line shell
– No remote logging in to a machine on which a Flume node is running is required to change configuration
08-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume’s Design Goals: Extensibility
Flume can be extended by adding connectors to existing storage layers or data platforms – General sources already provided include data from files, syslog,
and standard output (stdout) from a process – General endpoints already provided include files on the local
filesystem or in HDFS – Other connectors can be added using Flume’s API
08-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume: General System Architecture
The Master holds configuration information for each Node, plus a version number for that node – Version number is associated with the Node’s configuration
Nodes communicate with the Master every five seconds – Node passes its version number to the Master – If the Master has a later version number for the Node, it tells the
Node to reconfigure itself – The Node then requests the new configuration information
from the Master, and dynamically applies that new configuration
08-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Flume Node Characteristics
Each node has a source and a sink – Source tells the node where to receive data from – Sink tells the node where to send data to
Sink can have one or more decorators – Decorators perform simple processing on the data as it passes
through, such as: – Compression – Encryption – awk, grep-like functionality
08-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Using Flume
Flume is available as a tarball, RPM or Debian package
Once installed, start the Master – Usually achieved via an init script, or as a stand-alone process
with flume master
Configure Agent Nodes on the machine(s) generating the data – Minimum configuration:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>flume.master.servers</name> <value>master_host_name</value> </property> </configuration>'
08-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Using Flume (cont’d)
Start the Agent Node(s) – Typically via an init script, or as a stand-alone process with
flume node
Configure the Agent Nodes via the Master’s Web interface at http://<master_host_name>:35871/
08-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
An Overview of Flume
Hands-On Exercise: Using Flume
An Overview of Sqoop
Best Practices for Importing Data
Conclusion
08-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Hands-On Exercise: Using Flume
In this Hands-On Exercise you will create a simple Flume configuration to store dynamically-generated data in HDFS
Please refer to the Exercise Manual
08-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
An Overview of Flume
Hands-On Exercise: Using Flume
An Overview of Sqoop
Best Practices for Importing Data
Conclusion
08-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What is Sqoop?
Sqoop is “the SQL-to-Hadoop database import tool” – Developed at Cloudera – Open-source – Included as part of Cloudera’s Distribution including Apache
Hadoop (CDH)
Designed to import data from RDBMSs (Relational Database Management Systems) into Hadoop – Can also send data the other way, from Hadoop to an RDBMS
Uses JDBC (Java Database Connectivity) to connect to the RDBMS
08-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
How Does Sqoop Work?
Sqoop examines each table and automatically generates a Java class to import data into HDFS
It then creates and runs a Map-only MapReduce job to import the data – By default, four Mappers connect to the RDBMS – Each imports a quarter of the data
08-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sqoop Features
Imports a single table, or all tables in a database
Can specify which rows to import – Via a WHERE clause
Can specify which columns to import
Can provide an arbitrary SELECT statement
Sqoop can automatically create a Hive table based on the imported data
Supports incremental imports of data
Can export data from HDFS to a database table
08-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sqoop Connectors
Cloudera has partnered with third parties to create Sqoop ‘connectors’ – Add-ons to Sqoop which use a database’s native protocols to
import data, rather than JDBC – Typically orders of magnitude faster
– Not open-source, but freely downloadable from the Cloudera Web site
Current products supported – Oracle Database – MicroStrategy – Netezza – Others being developed
Microsoft has produced a version of Sqoop optimized for SQL Server
08-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sqoop Usage Examples
List all databases sqoop list-databases --username fred --password derf \ --connect jdbc:mysql://dbserver.example.com/
List all tables in the world database
sqoop list-tables --username fred --password derf \ --connect jdbc:mysql://dbserver.example.com/world
Import all tables in the world database
sqoop import-all-tables --username fred --password derf \ --connect jdbc:mysql://dbserver.example.com/world
08-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
An Overview of Flume
Hands-On Exercise: Using Flume
An Overview of Sqoop
Best Practices for Importing Data
Conclusion
08-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Do Others See As Data Is Imported?
When a client starts to write data to HDFS, the NameNode marks the file as existing, but being of zero size – Other clients will see that as an empty file
After each block is written, other clients will see that block – They will see the file growing as it is being created, one block at a
time
This is typically not a good idea – Other clients may begin to process a file as it is being written
08-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Importing Data: Best Practices
Best practice is to import data into a temporary directory
After it’s completely written, move data to the target directory – This is an atomic operation – Happens very quickly since it merely requires an update of the
NameNode’s metadata
Many organizations standardize on a directory structure such as – /incoming/<import_job_name>/<files> – /for_processing/<import_job_name>/<files> – /completed/<import_job_name>/<files>
It is the job’s responsibility to move the files from for_processing to completed after the job has finished successfully
Discussion point: your best practices?
08-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Populating HDFS Using Flume and Sqoop
An Overview of Flume
Hands-On Exercise: Using Flume
An Overview of Sqoop
Best Practices for Importing Data
Conclusion
08-26 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
What Flume is
How Flume works
What Sqoop is
How to use Sqoop to import data from RDBMSs to HDFS
Best practices for importing data
09-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 9 Installing and Managing Other Hadoop Projects
09-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Managing Other Hadoop Projects
In this chapter, you will learn:
What features Hive, HBase, and Pig provide
What administrative requirements they impose
09-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Note
Note that this chapter does not go into any significant detail about Hive, HBase, or Pig
Our intention is to draw your attention to issues System Administrators will need to deal with, if users request these products be installed – For more details on the products themselves, Cloudera offers
dedicated training courses on HBase, and on Hive and Pig
09-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Managing Other Hadoop Projects
Hive
Pig
HBase
Conclusion
09-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Using Hive to Query Large Datasets
Hive is a project initially created at Facebook – Now a top-level Apache project
Motivation: many data analysts are very familiar with SQL (the Structured Query Language) – The de facto standard for querying data in Relational Database
Management Systems (RDBMSs)
Data analysts tend to be far less familiar with programming languages such as Java
Hive provides a way to query data in HDFS using Java
Around 99% of Facebook’s Hadoop jobs are now created by the Hive interpreter
09-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample Hive Query
SELECT * from movies m JOIN scores s ON (m.id = s.movie_id)
WHERE m.year > 1995
ORDER BY m.name DESC
LIMIT 50;
09-7 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Hive Provides
Hive allows users to query data using HiveQL, a language very similar to standard SQL
Hive turns HiveQL queries into standard MapReduce jobs – Automatically runs the jobs, and displays the results to the user
Note that Hive is not an RDBMS! – Results take many seconds, minutes, or even hours to be
produced – Not possible to modify the data using HiveQL
– UPDATE and DELETE are not supported
09-8 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Getting Data Into Hive
A ‘Table’ in Hive represents an HDFS directory – Hive interprets all files in the directory as the contents of the table
by knowing how the columns and rows are delimited within the files – As well as the datatypes and names of the resulting columns
– Stores this information in the Hive Metastore
09-9 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing Hive
Hive runs on a user’s machine – Not on the Hadoop cluster itself – A user can set up Hive with no System Administrator input
– Using the standard Hive command-line or Web-based interface
– If users will be running JDBC-based clients, Hive should be run as a service on a centrally-available machine
By default, Hive uses a Metastore on the user’s machine – Metastore uses Derby, a Java-based RDBMS
If multiple users will be running Hive, the System Administrator should configure a shared Metastore for all users
09-10 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Creating a Shared Metastore
A shared Metastore is a database in an RDBMS such as MySQL
Configuration is simple: 1. Create a user and database in your RDBMS 2. Modify hive-site.xml on each user’s machine to refer to the
shared Metastore
09-11 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample hive-site.xml
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.local</name> <value>true</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql:/DB_HOST_NAME:DB_PORT/DATABASE_NAME</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>USERNAME</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>PASSWORD</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://NAMENODE_HOST:NAMENODE_PORT/user/hive/warehouse</value> </property> </configuration>
09-12 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Managing Other Hadoop Projects
Hive
Pig
HBase
Conclusion
09-13 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
What Is Pig?
Pig is another high-level abstraction on top of MapReduce
Originally developed at Yahoo! – Now a top-level Apache project
Provides a scripting language known as Pig Latin – Abstracts MapReduce details away from the user – Composed of operations that are applied to the input data to
produce output – Language will be relatively easy to learn for people experienced
in Perl, Ruby or other scripting languages – Fairly easy to write complex tasks such as joins of multiple
datasets – Under the covers, Pig Latin scripts are converted to MapReduce
jobs
09-14 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Sample Pig Script
movies = LOAD '/data/films' AS
(id:int, name:string, year:int);
ratings = LOAD '/data/ratings' AS
(movie_id: int, user_id: int, score:int);
jnd = JOIN movies BY id, ratings BY movie_id;
recent = FILTER jnd BY year > 1995;
srtd = ORDER recent BY name DESC;
justafew = LIMIT srtd 50;
STORE justafew INTO '/data/pigoutput';
09-15 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing Pig
Pig runs as a client-side application – There is nothing extra to install on the cluster
Set the configuration file to point to the Hadoop cluster – In the pig.properties file in Pig’s conf directory, set
fs.default.name=hdfs://<namenode_location>/ mapred.job.tracker=<jobtracker_location>:8021
09-16 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Managing Other Hadoop Projects
Hive
Pig
HBase
Conclusion
09-17 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase: A Column-Oriented Datastore
HBase is a distributed, sparse, column-oriented datastore – Distributed: designed to use multiple machines to store and serve
data – Leverages HDFS
– Sparse: each row may or may not have values for all columns – Column-oriented: Data is stored grouped by column, rather than
by row – Columns are grouped into ‘column families’, which define
what columns are physically stored together
Modeled after Google’s BigTable datastore
09-18 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase Usage Scenarios
Storing large amounts of data – Hundreds of gigabytes up to petabytes
Situations requiring high write throughput – Thousands of insert, update or delete operations per second
Rapid lookup of values by key
09-19 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase Terminology
Region – A subset of a table’s rows
– Similar to a partition
HRegionServer – Serves data for reads and writes
Master – Responsible for coordinating HRegionServers – Assigns Regions, detects failures of HRegionServers, and
controls administrative functions
09-20 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Running HBase
CDH3 includes HBase
In conf/hbase-site.xml , set the hbase.rootdir property to point to the Hadoop filesystem to use – Don’t manually create this directory the first time; HBase will do
so, and will add all the required files
Assuming your are running a fully-distributed Hadoop cluster, set the hbase.cluster.distributed property to true
Edit the file ${HBASE_HOME}/conf/regionservers to list all hosts running HRegionServers
Start HDFS
Start HBase by running ${HBASE_HOME}/bin/start-hbase.sh
09-21 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Advanced HBase Configuration
HBase Master includes a built-in version of ZooKeeper – A scalable, highly-available system that facilitates coordination
among distributed processes – Commonly used to provide locking, configuration, and naming
services
Larger installations of HBase should use a separate ZooKeeper cluster – Typically three or five machines external to the Hadoop cluster
There are other, more complex configuration options available for HBase – Cloudera offers an HBase training course which covers many of
these issues
09-22 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase Best Practices
We recommend deploying HBase on a dedicated cluster – HBase clients are latency-sensitive – General MapReduce jobs are batch jobs, with spiky load
characteristics – Mixing the two can cause problems
Deploy RegionServers on all machines running the DataNode daemon – Only deploying RegionServers on some nodes can result in
uneven storage utilization
Allocate plenty of RAM to slave nodes – Extra RAM will be used by the operating system for file system
caches, resulting in faster disk I/O
09-23 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
HBase Best Practices
Never deploy ZooKeeper on slave nodes – ZooKeeper is extremely latency-sensitive – If a ZooKeeper node has to wait for a disk operation, or swaps
out of memory, it may be considered dead by its quorum peers – If ZooKeeper fails, the HBase cluster will fail
Increase dfs.datanode.max.xcievers to 8096 or even higher – Xcievers handle sending and receiving block data – HBase uses many xcievers
Allocate 8GB to 12GB of RAM to each RegionServer
Do not run the HDFS balancer! – This will destroy data locality for RegionServers
Deploy HBase on a homogeneous cluster
09-24 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Installing and Managing Other Hadoop Projects
Hive
Pig
HBase
Conclusion
09-25 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
In this chapter, you have learned:
What features Hive, HBase, and Pig provide
What administrative requirements they impose
10-1 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Chapter 10 Conclusion
10-2 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Conclusion
During this course, you have learned:
The core technologies of Hadoop
How to plan your Hadoop cluster hardware and software
How to deploy a Hadoop cluster
How to schedule jobs on the cluster
How to maintain your cluster
How to monitor, troubleshoot, and optimize the cluster
What system administrator issues to consider when installing Hive, HBase and Pig
How to populate HDFS from external sources
10-3 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Next Steps
Cloudera offers a number of other training courses: – Developer training for Hadoop – Hive training – HBase training – Custom courses
Cloudera also provides consultancy and troubleshooting services – Please ask your instructor for more information
10-4 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Class Evaluation
Please take a few minutes to complete the class evaluation – Your instructor will show you how to access the online form
10-5 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Certification Exam
You are now ready to take the Hadoop Certified System Administrator examination – Your instructor will explain how to access the exam
10-6 Copyright © 2010-2012 Cloudera. All rights reserved. Not to be reproduced without prior written consent.
Thank You!
Thank you for attending this course
If you have any further questions or comments, please feel free to contact us – Full contact details are on our Web site at http://www.cloudera.com/