1
COSC 6397
Big Data Analytics
Distributed File Systems (II)
Edgar Gabriel
Spring 2014
HDFS Basics
• An open-source implementation of Google File System
• Assume that node failure rate is high
• Assumes a small number of large files
• Write-once-ready-many pattern
• Reads are performed in a large streaming fashion
• Large throughput instead of low latency
• Moving computation is easier than moving data
2
HDFS components
• Namenode
– manages the File System's namespace/meta-data/file blocks
– Runs on 1 machine to several machines
• Datanode
– Stores and retrieves data blocks
– Reports to Namenode
– Runs on many machines
• Secondary Namenode
– Performs house keeping work so Namenode doesn’t have to
– Requires similar hardware as Namenode machine
– Not used for high-availability – not a backup for Namenode
3
HDFS Blocks
• Files are split into blocks
– Managed by Namenode, stored by Datanode
– Transparent to user
• Blocks are traditionally either 64MB or 128MB
– Default is 64MB
– The motivation is to minimize the cost of seeks as
compared to transfer rate
• Namenode determines replica placement
• Default replication is 3
– 1st replica on the local rack
– 2nd replica on the local rack but different machine
– 3rd replica on the different rack
Namenode • Abitrator and repository for all HDFS metadata
• Data never flows through Namenode
• Executes file system namespace operations
– open, close, rename files and directories
• Determines mapping of blocks to Datanodes
• Metadata in Memory
– The entire metadata is in main memory
• Types of metadata
– List of files
– List of Blocks for each file
– List of DataNodes for each block
– File attributes, e.g. creation time, replication factor
• A Transaction Log
– Records file creations, file deletions etc
4
DataNode
• A Block Server
– Stores data in the local file system (e.g. ext4)
– Stores metadata of a block (e.g. CRC)
– Serves data and metadata to Clients
• Block Report
– Periodically sends a report of all existing blocks to the
NameNode
• Facilitates Pipelining of Data
– Forwards data to other specified DataNodes
• Client retrieves a list of DataNodes on which to place
replicas of a block
• Client writes block to the first DataNode
• The first DataNode forwards the data to the next node
in the Pipeline
• When all replicas are written, the Client moves on to
write the next block in file
5
Rebalancer
• Goal: % disk full on DataNodes should be similar
– Usually run when new DataNodes are added
– Cluster is online when Rebalancer is active
– Rebalancer is throttled to avoid network congestion
– Command line tool
HDFS limitations
• Bad at handling large amount of small files
• Write limitations
– Single writer per file
– Writes only at the end of file, no-support for arbitrary
offset
• Low-latency reads
– High-throughput rather than low latency for small chunks
of data
– HBase addresses this issue
6
• Serve read / write requests from client
• Block creation, deletion and replication upon
instruction from Namenode
• No knowledge of HDFS files
• Stores HDFS data in files on local file system
– Determines optimal file count per directory
– Creates subdirectories automatically
7
Comparison HDFS to PVFS2
PVFS2 HDFS
Metadata server Distributed Federation of
Metadata server in
v2.2.0
Dataserver Stateless Unclear, probably
stateful (bc. of
replication)
Default stripe size 64KB 64MB
POSIX support No, kernel interfaces
implement similar
semantics
No, similar interfaces
available through FUSE
8
Comparison HDFS to PVFS2
PVFS2 HDFS
Reliability No/ high availability
PVFS2 experimental
Replication
Support for concurrent
access of the same file
Yes No
Locking No No
Other features Atomic append
File System Java API
• org.apache.hadoop.fs.FileSystem
– Abstract class that serves as a generic file system
representation
– Note: it’s a class and not an Interface
• Hadoop ships with multiple concrete implementations:
– org.apache.hadoop.fs.LocalFileSystem
• Good old native file system using local disk(s)
– org.apache.hadoop.hdfs.DistributedFileSystem
• Hadoop Distributed File System (HDFS)
• Will mostly focus on this implementation
– org.apache.hadoop.hdfs.HftpFileSystem
• Access HDFS in read-only mode over HTTP
– org.apache.hadoop.fs.ftp.FTPFileSystem
• File system on FTP server
9
File System Java API
public class SimpleLocalLs {
public static void main(String[] args) throws Exception{
Path path = new Path("/");
if ( args.length == 1){
path = new Path(args[0]);
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus [] files = fs.listStatus(path);
for (FileStatus file : files ){
System.out.println(file.getPath().getName());
}
}
}
• Hadoop's Path object represents a file or a directory
– Not java.io.File which tightly couples to local filesystem
• Path is really a URI on the FileSystem
– HDFS: hdfs://localhost/user/file1
– Local: file:///user/file1
• Examples:
– new Path("/test/file1.txt");
– new Path("hdfs://localhost:9000/test/file1.txt");
10
Reading data from HDFS
1. Create FileSystem
2. Open InputStream to a Path
3. Copy bytes using IOUtils
4. Close Stream
Reading data from HDFS
FileSystem fs = FileSystem.get(new Configuration());
• If you run with yarn command, DistributedFileSystem
(HDFS) will be created
– Utilizes fs.default.name property from configuration
• Recall that Hadoop framework loads core-site.xml
which sets property to hdfs (hdfs://localhost:8020)
11
Reading data from HDFS
InputStream input = null;
try {
input = fs.open(fileToRead);
…
} finally {
IOUtils.closeStream(input);
}
• fs.open returns org.apache.hadoop.fs.FSDataInputStream
– Another FileSystem implementation will return their own
custom implementation of InputStream
– Opens stream with a default buffer of 4k
– If you want to provide your own buffer size use
fs.open(Path f, int bufferSize)
• Utilize IOUtils to avoid boiler plate code that catches IOException
Reading data from HDFS
IOUtils.copyBytes(inputStream, outputStream,
buffer);
• Copy bytes from InputStream to OutputStream
– Hadoop’s IOUtils makes the task simple
– buffer parameter specifies number of bytes to buffer at a
time
12
Reading data from HDFS
public class ReadFile {
public static void main(String[] args) throws IOException {
Path fileToRead = new Path("/data/readMe.txt");
FileSystem fs = FileSystem.get(new Configuration());
InputStream input = null;
try {
input = fs.open(fileToRead);
IOUtils.copyBytes(input, System.out, 4096);
} finally {
IOUtils.closeStream(input);
}
}
}
Reading data - seek
• FileSystem.open returns FSDataInputStream
– Extension of java.io.DataInputStream
– Supports random access and reading via interfaces:
• PositionedReadable : read chunks of the stream
– Seekable : seek to a particular position in the stream
• FSDataInputStream implements Seekable interface
– void seek(long pos) throws IOException
– Seek to a particular position in the file
– Next read will begin at that position
– If you attempt to seek past the file boundary IOException
is emitted
• Expensive operation – strive for streaming and not seeking
13
Reading data - seek
public class SeekReadFile {
public static void main(String[] args) throws IOException {
Path fileToRead = new Path("/training/data/readMe.txt");
FileSystem fs = FileSystem.get(new Configuration());
FSDataInputStream input = null;
try {
input = fs.open(fileToRead);
System.out.print("start postion="+input.getPos()+":
IOUtils.copyBytes(input, System.out, 4096, false);
input.seek(11);
System.out.print("start postion="+input.getPos()+":
IOUtils.copyBytes(input, System.out, 4096, false);
} finally {
IOUtils.closeStream(input);
}
}
}
Writing Data in HDFS
1. Create FileSystem instance
2. Open OutputStream
a) FSDataOutputStream in this case
b) Open a stream directly to a Path from FileSystem
c) Creates all needed directories on the provided path
3. Copy data using IOUtils
14
HDFS C API #include "hdfs.h"
int main(int argc, char **argv) {
hdfsFS fs = hdfsConnect("namenode_hostname",namenode_port);
if (!fs)
fprintf(stderr, "Cannot connect to HDFS.\n");exit(-1);
int exists = hdfsExists(fs, fileName);
if (exists > -1) {
fprintf(stdout, "File %s exists!\n", fileName);
}else{
// Create and open file for writing
hdfsFile outFile = hdfsOpenFile(fs, fileName,
O_WRONLY|O_CREAT, 0, 0, 0);
if (!outFile) {
fprintf(stderr, “Open failed %s\n", fileName); exit(-2);
}
hdfsWrite(fs, outFile, (void*)message, strlen(message));
hdfsCloseFile(fs, outFile);
}
HDFS C API
// Open file for reading
hdfsFile inFile = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
if (!inFile) {
fprintf(stderr, "Failed to open %s for reading!\n", fileName);
exit(-2);
}
char* data = malloc(sizeof(char) * size);
// Read from file.
tSize readSize = hdfsRead(fs, inFile, (void*)data, size);
fprintf(stdout, "%s\n", data);
free(data);
hdfsCloseFile(fs, inFile);
hdfsDisconnect(fs);
return 0;
}