[ieee 2013 42nd international conference on parallel processing (icpp) - lyon, france...
TRANSCRIPT
High-Performance Design of Hadoop RPC with RDMA overInfiniBand*
Xiaoyi Lu, Nusrat S. Islam, Md. Wasi-ur-Rahman, Jithin Jose, Hari Subramoni,
Hao Wang, and Dhabaleswar K. (DK) Panda
Department of Computer Science and Engineering
The Ohio State University
Email: {luxi, islamn, rahmanmd, jose, subramon, wangh, panda}@cse.ohio-state.edu
Abstract—Hadoop RPC is the basic communication mech-anism in the Hadoop ecosystem. It is used with other Hadoopcomponents like MapReduce, HDFS, and HBase in realworld data-centers, e.g. Facebook and Yahoo!. However,the current Hadoop RPC design is built on Java sock-ets interface, which limits its potential performance. TheHigh Performance Computing community has exploited highthroughput and low latency networks such as InfiniBandfor many years. In this paper, we first analyze the per-formance of current Hadoop RPC design by unearthingbuffer management and communication bottlenecks, thatare not apparent on the slower speed networks. Then wepropose a novel design (RPCoIB) of Hadoop RPC withRDMA over InfiniBand networks. RPCoIB provides a JVM-bypassed buffer management scheme and utilizes messagesize locality to avoid multiple memory allocations and copiesin data serialization and deserialization. Our performanceevaluations reveal that the basic ping-pong latencies forvaried data sizes are reduced by 42%-49% and 46%-50% compared with 10GigE and IPoIB QDR (32 Gbps),respectively, while the RPCoIB design also improves the peakthroughput by 82% and 64% compared with 10GigE andIPoIB. As compared to default Hadoop over IPoIB QDR,our RPCoIB design improves the performance of the Sortbenchmark on 64 compute nodes by 15%, while it improvesthe performance of CloudBurst application by 10%. Wealso present thorough, integrated evaluations of our RPCoIBdesign with other research directions, which optimize HDFSand HBase using RDMA over InfiniBand. Compared withtheir best performance, we observe 10% improvement forHDFS-IB, and 24% improvement for HBase-IB. To thebest of our knowledge, this is the first such design of theHadoop RPC system over high performance networks suchas InfiniBand.
I. INTRODUCTION
Hadoop [1] is one of the most popular open-source
frameworks of the MapReduce [2] programming model to
support massive data analysis. Because of its scalability,
fault-tolerance, and productivity, Hadoop is being used
in many data-center applications at organizations such as
Facebook, Yahoo!, etc. Hadoop Remote Procedure Call
(RPC) [3] is the fundamental communication mechanism
in Hadoop. It is mainly used for metadata exchange in all
of the Hadoop components such as MapReduce, Hadoop
Distributed File System (HDFS) [4] and Hadoop Database
(HBase) [5]. An efficient RPC design should promise
performance improvement of the Hadoop system. How-
ever, in the current Hadoop design, Hadoop RPC is based
on TCP/IP sockets, which limits the performance even
on modern high performance interconnects. There have
*This research is supported in part by National Science Foundationgrants #OCI-0926691, #OCI-1148371 and #CCF-1213084.
been several studies during the last two years [6–9] which
have explored HDFS, HBase, and MapReduce designs
over high performance interconnects such as InfiniBand.
These studies reveal that the socket-based communication
design in different Hadoop components cannot fully uti-
lize the high-bandwidth and low-latency capability of the
advanced interconnects. However, these studies still let
Hadoop RPC go through socket-based communication. It
may influence the whole system and application perfor-
mance, if we run them over high performance networks.
Furthermore, none of these studies take into account the
overhead from the buffer management and the multiple
data copies in the data exchange between components.
In this paper, we first investigate the performance bottle-
necks in current Hadoop RPC design. We reveal that both
the data transfer and the buffer management are important
for Hadoop RPC performance on InfiniBand. Then, we
propose a high-performance design of Hadoop RPC over
InfiniBand (RPCoIB). We also propose a JVM-bypassed
buffer management scheme and utilize the observation
of message size locality to avoid the multiple memory
allocations and copies in data serialization and deserial-
ization. We compare RPCoIB with default Hadoop RPC
over different protocols and networks, including 10GigE
and IPoIB (TCP/IP emulation) QDR (32 Gbps). For the
basic ping-pong operations for varied data sizes, we have
observed 42%-49% and 46%-50% latency reductions by
RPCoIB as compared with default Hadoop RPC over
10GigE and IPoIB, respectively. We also find that the
RPCoIB design can improve the peak throughput by 82%
and 64% as compared with the original design running
over 10GigE and IPoIB. For MapReduce, Hadoop with
our RPCoIB design improves the performance of the Sort
benchmark on 64 compute nodes by 15% as compared
with default Hadoop running over IPoIB; while it can also
improve the performance of CloudBurst [10] application
by 10%. We present thorough evaluations of our RPCoIB
design with other research directions for Hadoop compo-
nents running over IB, including HDFS over IB [6] and
HBase over IB [7]. Compared to their best performance,
the integrated software systems with RPCoIB provide
about 10% extra performance improvement for HDFS and
24% performance improvement for HBase Get-Put-mix
workload.
To the best of our knowledge, this work is the first such
design of the Hadoop RPC system over high performance
networks such as InfiniBand. The following are the main
contributions of this paper:
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
2013 42nd International Conference on Parallel Processing
0190-3918/13 $26.00 © 2013 IEEE
DOI 10.1109/ICPP.2013.78
641
1) Revealing the performance bottlenecks of default
Hadoop RPC design on advanced interconnects;
which, are not apparent on the slower speed net-
works.
2) Proposing a novel design (RPCoIB) of Hadoop RPC
over InfiniBand; which, provides a JVM-bypassed
buffer management mechanism, utilizes message
size locality to avoid extra memory allocations and
copies for data serialization and deserialization.
3) Providing detailed evaluations of the RPCoIB design
with micro-benchmarks and a cloud application and
presenting evaluations on integrated systems, like
RPC-HDFS over IB and RPC-HBase over IB.
The rest of the paper is organized as follows. Section II
firstly discusses the performance bottlenecks of Hadoop
RPC. Then we present RPCoIB design in Section III and
the performance evaluations in Section IV. Section V lists
related work and Section VI concludes the paper.
II. BOTTLENECK ANALYSIS IN HADOOP RPC
The default Hadoop RPC design is based on Java
sockets communication. Although we can simply change
the networks to InfiniBand or 10GigE and continue to run
Hadoop on TCP/IP, the communication over such high per-
formance networks cannot achieve maximum performance
benefits. This study identifies the bottlenecks in current
Hadoop RPC design when we target high performance
networks. The bottlenecks are not sensitive in socket-
based communication over TCP/IP on 1GigE network,
because the data transfer on the network is the bottleneck.
For high performance networks, if we do not carefully
redesign the communication subsystem, these bottlenecks
will tremendously decrease the performance.
A. Multiple Copies in Call Serialization and Sending
Hadoop RPC uses Writable based serialization
mechanism [3], which involves multiple memory copies
in data serialization and sending processes.
Listing 1. Example Code to Send a Call in Client
1 public void send(Call call) {2 // Serialization3 DataOutputBuffer d = new ←↩
DataOutputBuffer();4 d.writeInt(call.id);5 call.param.write(d);6 byte[] data = d.getData();7 int dataLength = d.getLength();8
9 // Sending10 DataOutputStream out = new ←↩
DataOutputStream(new ←↩BufferedOutputStream(socketStream));
11 out.writeInt(dataLength);12 out.write(data, 0, dataLength);13 out.flush();
Listing 1 is an extracted code snippet of data serial-
ization and sending on the client side. According to this
example, when a call is to be sent to server, the RPC
client will first allocate a DataOutputBuffer, which
contains an internal buffer with 32 bytes as default in
common Java versions (e.g. 6 or 7). This buffer will
be used to save all serialized data. However, an object
with larger serialized data size can not be saved in a
small internal buffer. In order to guarantee the correctness,
Hadoop RPC will handle this case using Algorithm 1.
Algorithm 1: DEFAULT ALGORITHM FOR MEMORY
ADJUSTMENT OF DATAOUTPUTBUFFER IN HADOOP
RPC
Input: Bytes to be written bs, offset, len
Output: Data is saved successfully or failed.
1 // cur count is used to save the bytes number of
existing data in the internal buf .
2 new count← cur count+ len
3 // The default initial value of buf len is 32 bytes.
4 if new count > buf len then
5 // reallocate buffer
6 new buf len← max(buf len ∗ 2, new count)7 new buf ← allocate(new buf len)8 // copy old data
9 copy old buffer data to new buf
10 buf ← new buf
11 end
12 // copy new data
13 copy bytes bs (from offset) to buf (from
cur count) with length len
14 cur count← new count
Algorithm 1 has three primary steps: (1) If the available
buffer space is smaller than required data size, it will
reallocate a new buffer by doubling the previous size
or with the same as required size plus previous size.
(2) Copy the previous data to the new buffer if step 1
is executed. (3) Copy new data to the new buffer. For
small message sizes, there will be only one copy for the
new data. But, for the serialization of the large object
(e.g. larger than the initial size of internal buffer), one
more data copy is required at least. In our experimental
studies, we have observed that buffer reallocations and
multiple memory copies have happened within a single
RPC call in a MapReduce job. This is because Algorithm 1
only enlarges the buffer size by the local knowledge of
buf len ∗ 2 or new count. It does not know what is
the final required size in the following data serialization
stream. Furthermore, the Writable serialization often
writes a small piece of data into the internal buffer, such
as writeInt, writeBoolean, and writeByte. It
makes Algorithm 1 more inefficient because it has to
reallocate the buffer many times to reach the final size
and copy both the old and the new data to the buffer.
The profiling results, as shown in Table I, are achieved
by running a Sort MapReduce job of 4 GB data on 9 nodes
with 1 master and 8 slaves (The node configuration can be
found from Cluster A in Section IV). The data in Table I
is averaged over three times of job executions. It shows
the detailed RPC communication characteristics of map
and reduce steps. Several points can be highlighted. First,
642642642642642642
Component Protocol Name Method Name Avg. Mem Adjustment
Times
Avg. Serialization Time
(us)
Avg. Send Time
(us)
Map
mapred.TaskUmbilicalProtocol
getTask 2 89 47
ping 2 147 114
statusUpdate 5 696 86
done 2 83 51
hdfs.ClientProtocolgetFileInfo 2 70 57
getBlockLocations 2 46 34
Reduce
mapred.TaskUmbilicalProtocol
getTask 2 98 52
getMapCompletionEvents 3 103 62
ping 2 72 65
statusUpdate 5 270 61
commitPending 5 522 79
canCommit 2 54 42
done 2 38 35
hdfs.ClientProtocol
getFileInfo 2 32 23
mkdirs 3 48 23
create 4 80 30
renewLease 2 63 60
addBlock 3 127 75
complete 3 149 95
getListing 2 31 20
rename 3 45 19
delete 2 35 22
Table IRPC INVOCATION PROFILING IN A MAPREDUCE JOB OF SORT
as shown in column of “Avg. Mem Adjustment Times”,
there indeed exists multiple memory reallocations and
copies in RPC calls during the running of MapReduce
job. Second, from the columns of “Avg. Mem Adjustment
Times”, “Avg. Serialization Time”, and “Avg. Send Time”,
we can see that the serialization overhead could be a major
bottleneck due to multiple times of memory adjustments in
Hadoop RPC. When the times of adjustment is big (e.g. 5),
the serialization time is often long (eg. statusUpdate
in Map, commitPending in Reduce). Third, from Ta-
ble I, it is obvious that the Reduce stage is more RPC-
intensive than the Map stage.
One solution to avoid this bottleneck is to allocate a
larger internal buffer when we create the output buffer. In
the Hadoop RPC server side, it allocates an internal buffer
with a 10 KB initial size. However, this kind of design
will involve additional overhead. Different RPC methods
have varied message sizes. If the initial buffer is fixed to
a large size for all RPC calls, it will increase the memory
footprint, especially in a large scale Hadoop cluster, since
these buffers are allocated by handler threads. If the initial
buffer is fixed to a small size for RPC calls, the memory
adjustment is needed. So such kind of design is not
scalable and flexible. In order to minimize the times of
memory adjustment, we need to get an internal buffer with
an appropriate size for an RPC call. Then the question is
changed to: how can we know the serialized data size, such
as line 7 of Listing 1, before the serialization is finished?
After serialization, the serialized data will be writ-
ten to a DataOutputStream, which has a backend
BufferedOutputStream, as shown in the line 10
of Listing 1. There exists another memory copy since
the data has to be copied to the internal buffer of
BufferedOutputStream before it is sent out by the
socket-based stream. In addition, all of these buffers are
allocated in the JVM heap space. It usually needs one
more copy to move data from JVM heap to the native IO
layer for data transmission. The above analysis shows that
there are multiple memory copies in Hadoop RPC. It will
be a performance bottleneck when Hadoop RPC runs on
high performance networks especially, because the speed
of data transfer is highly increased on high performance
networks and these copies will enlarge the latency.
B. Frequent Buffer Allocation and Extra Data Copy in
Call Receiving
The call receiving and deserialization are important in
both RPC client and server sides. On RPC client, after
receiving the response in form of a byte stream, the
client will deserialize the bytes to a Writable object for
applications. On RPC server, when it receives data from
the client, it also needs to deserialize the data to complete
the function call. We also extract the code snippet of data
receiving and deserialization in the server side of Hadoop
RPC as shown in Listing 2.
Listing 2. Example Code to Receive a Call in Server
1 public void receive(SocketChannel ←↩channel, Class paramClass) {
2 // Receiving3 ByteBuffer lenBuffer = ByteBuffer.←↩
allocate(4);4 channelRead(channel, lenBuffer);5 int len = lenBuffer.getInt();6 ByteBuffer data = ByteBuffer.allocate(←↩
len);7 // read in smaller chunks and fully if data size is large8 channelReadFully(channel, data);9
10 // Deserialization11 DataInputStream dis = new ←↩
DataInputStream(new ←↩ByteArrayInputStream(data.array()));
12 int id = dis.readInt(); // call id13 Writable param = ReflectionUtils.←↩
newInstance(paramClass, conf);14 param.readFields(dis);15 }
643643643643643643
As illustrated in line 6 of Listing 2, the RPC server
allocates a new buffer to read the data from the socket
channel fully. This approach has two problems: First, when
allocating the ByteBuffer in JVM heap to receive data,
the server has to copy the data from the native IO layer to
the heap buffer. Second, since the byte buffer is allocated
for each RPC call, the allocation operation is frequently
called during the execution of MapReduce job. These
problems will be the performance bottlenecks, especially
for large messages. The message with large data size will
lead to one-time large buffer allocation and memory copy
from the native IO layer to the JVM heap layer. If the large
message has to be read in multiple smaller chunks, there
will be multiple memory copies. Many recent network
frameworks, for example MINA [11], support the direct
buffer technique to avoid such an overhead. In our design,
we also utilize the direct buffer technique to connect Java
applications with low-level RDMA-enabled IO systems.
When the network speed is slow, like 1GigE, the default
design does not introduce too much overhead because the
overhead of data transfer on the network is too high.
However, if we target the high performance networks,
the buffer allocation and copy overheads will become the
performance bottlenecks. We design a micro-benchmark
for Hadoop RPC [12] to show the effect of the buffer
allocation overhead. The benchmark is a ping-pong latency
benchmark including two processes as one sever and one
client. The client will invoke the pingpong method
registered in the server and the parameter of this method is
a BytesWritable object. We vary the payload sizes in
the benchmark. Figure 1 shows the profiling data on both
1GigE and IPoIB, respectively. The Y axis represents the
ratio of buffer allocation time to the total RPC receiving
time on the server side.
��
�����
����
�����
����
�����
����
�����
��� �� ��� ����� ������ ���� ����
����
������������
������
�������
Figure 1. Ratio of Buffer Allocation Time to Call Receiving Time
As illustrated in Figure 1, in the RPC call receiving, the
buffer allocation occupies about 30% of the total receiving
time when the payload size is 2 MB and Hadoop RPC runs
on IPoIB. But such big percentage is not obvious when
RPC runs on 1GigE. The above analysis indicates that the
buffer allocation and extra data copy in call receiving is
another bottleneck in running Hadoop RPC on advanced
networks.
In addition, as discussed in recent studies [6–9], current
socket-based communication protocol in default Hadoop
components can not fully take advantage of the strengths
from RDMA-capable interconnects, which will also be-
come a bottleneck in the process of message exchanging.
In this paper, we redesign the communication part in
Hadoop RPC based on native InfiniBand support.
III. PROPOSED DESIGN
In this section, we present a high-performance design of
Hadoop RPC over InfiniBand (RPCoIB). Our design keeps
the same interfaces as the existing socket-based Hadoop
RPC and makes it RDMA capable.
A. Basic Idea
As example code shown in Section II, current Hadoop
RPC design is Java InputStream, OutputStream,
and SocketChannel based. InputStream and
OutputStream are mainly used by the client side, while
SocketChannel is utilized by the server. The basic
read/write operations are similar between them. Based on
these observations, and in order to achieve better trade-
off between performance and backward compatibility, our
basic idea is to design a set of RDMA-based and Java
IO interface-compatible classes, which include RDMAIn-
putStream, RDMAOutputStream, and RDMAChannel. Due
to the compatibility with Java standard IO interfaces,
they can be used smoothly to improve Hadoop RPC
implementation. The introduction of these classes not only
simplifies the programming (e.g. automatically maintain
message length), but also achieves better performance by
serializing or deserializing data directly to or from RDMA
communication buffers that ultimately reducing memory
copies.
B. JVM-bypass Buffer Management
As discussed in Section II, one of the bottlenecks in
default Hadoop RPC design is multiple memory copies
from the JVM heap space to the native IO layer. There-
fore, our design bypasses JVM heap memory space,
and directly utilizes native memory, which can be eas-
ily accessed by RDMA operations. Figure 2 shows our
JVM-bypass buffer and communication management for
RDMAInputStream and RDMAOutputStream in the
RPC client side. Due to a similar design in the server side,
we do not draw the details of the server side in the figure.
In our design, when RDMAOutputStream or
RDMAInputStream is constructed, they will get a native
buffer from native buffer pool (see Section III-C). These
buffers are pre-allocated and pre-registered for RDMA
operations when the RPCoIB library loads. The overhead
of getting a buffer is very small, and the allocation over-
head will be amortized by all invocations. These native
buffers will be wrapped as Java DirectByteBuffer,
which can be accessed by both the Java layer and the
native IO layer. All serialized data in the Java layer can be
directly stored in the native buffer to avoid allocating the
intermediate buffers in JVM heap space. Thus, this scheme
eliminates the data copy overhead from JVM to RDMA-
accessible memory space. When the buffer gets filled up,
it means that the serializable object is too big to be saved
in current buffer. It will trigger the buffer adjustment
process, which will be discussed in Section III-C. When
644644644644644644
Figure 2. JVM-bypass Buffer and Communication Management forRDMA Input&Output Streams
the caller invokes flush, the stored data will be sent out
by our implemented RDMA JNI library. The library first
gets the native buffer address by the direct buffer handle,
then uses RDMA operations to access the data. RDMA
operations will offload the communication to the hardware
layer with very little CPU involvement. In the receiving
and deserialization processes, the receiver can also avoid
extra memory copy by using RDMAInputStream.
This design adopts JVM-bypass buffer and commu-
nication management techniques to implement the RPC
protocol: which, requires minimal assistance from JVM.
From our evaluations (see Section IV-B), we can see that
it can dramatically reduce the communication latency and
improve the throughput. Meanwhile, through implement-
ing Java IO stream interfaces, we can hide these details to
applications and make them transparently achieve optimal
performance.
C. History based Two-level Buffer Pool
The basic idea of the buffer pool design is to avoid
frequent buffer allocations, which is a bottleneck when
we run Hadoop RPC over high performance networks.
However, the buffer pool design is not straightforward
when we want to solve the bottlenecks discussed in
Section II. The main problem is how to get a buffer,
with an appropriate size, from the buffer pool for the call
transmission, as the required size is not known upfront.
In this paper, by virtue of the detailed profiling, we
observed an interesting phenomenon about the message
size of RPC calls. In order to analyze these numbers, we
define a kind of calls as a tuple <protocol, method>,
then we use this information to extract tracing data for
each kind of call. The protocol is the class name of
the method, which is required to be registered in Hadoop
RPC. When we analyze the message size of each kind
of call, we find that many kinds of calls will keep very
reliable message sizes during the whole MapReduce job
execution. For example, in the DatanodeProtocol,
the invocations of blockReceived often keep message
sizes at about 430 bytes when we run a 4 GB Sort job over
9 nodes. However, some calls seem to have very irregular
message sizes. We choose three kinds of typical calls with
varied message sizes in Hadoop to show this phenomenon.
In order to make the message size variety clearer, we
choose a sequence of calls with the most varied sizes
for each kind of calls. As shown in Figure 3, these three
kinds of calls are heartbeat in JobTracker (denoted as
JT heartbeat), statusUpdate in TaskTracker (denoted
as TT statusUpdate), and getFileInfo in NameNode
(denoted as NN getFileInfo). From this figure, we can
see that their message sizes are widely varied, especially
as heartbeat and getFileInfo. However, if we
distribute all of their sizes into several size classes (e.g.
128 Bytes, 256 Bytes, etc.), just as shown in Figure 3,
we find that the message sizes of sequential calls are very
likely to fall into same range. This indicates that when
we get a buffer with an appropriate size to handle the
current call, it has a high possibility that this buffer can be
reused for the next call with the same tuple <protocol,
method>. We call this phenomenon Message Size Local-
ity in Hadoop RPC.
� �
!"#�
"$��
$!"�
!%" �
"% #�
%&��
%� "%� %� �%� #%� !%%� !"%�
������'�����
��
������
������(���� ���)��)������ ������'�����*�
Figure 3. Message Size Locality in Hadoop RPC
Based on these observations, we present our buffer pool
design as depicted in Figure 4. Our buffer pool follows
a two-level model. One pool is the native memory pool,
while the other pool is a shadow pool in the JVM layer.
The shadow pool means that its buffers come from the
native pool and it keeps references to native buffers and
provides DirectBuffer objects for usage in the Java
layer. The design consideration of shadow pool is that the
buffer usage history is much easier to be traced in the Java
layer. When the application wants to invoke a function, we
can easily get the meta information of this call in the Java
layer and acquire a buffer from the shadow pool by two
parameters: protocol and method. Then the shadow
pool will use the key “protocol + method” to index
a latest message size for the call. The message size history
information is updated by RDMAOutputStream. During
the serialization process, if the current buffer size is an
appropriate buffer size for the call, it will use the current
size to update the history record for this kind of calls. But
if the current buffer size is too small, the output stream
will re-get a new buffer from the buffer pool by doubling
buffer space until it is enough. When the output stream
returns the buffer to the shadow pool, it will update the
history trace information for this kind of calls. If current
buffer size is larger than the actual message size, then our
645645645645645645
library will shrink the history record of size. Then, the next
time, due to message size locality, it has a high possibility
of being an appropriate size again. Such a design can
avoid a large footprint of memory usage, and also provide
appropriate buffers in most of the time. The discovering
of message size locality can be used on both InfiniBand
and other networks.
Figure 4. History based Two-level Buffer Pool
The native buffer pool design arranges buffers to multi-
ple kinds of size classes. The main reason is to maximally
satisfy the message size locality shown in Figure 3. Such
kind of native buffer pool design is also discussed in some
other libraries, like TCMalloc [13], UCR [14, 15], etc. Due
to the limit of space, we do not elaborate much on this.
D. Integrated Design of RPCoIB
In our integrated design, we follow the principle of
keeping the existing Hadoop RPC architecture and in-
terface intact and making our changes by necessary. In
particular, we redesigned the Connection class in the client
side and the Listener and Reader classes in the server side
of default Hadoop RPC.
In the default Hadoop RPC, the client side has two ma-
jor kinds of threads: One is the caller thread, which is re-
sponsible for getting or creating connections, sending calls
and waiting for results from RPC server, while the other
is the Connection thread. The default Connection
thread is sockets interface based design, so we need to
change it by utilizing native InfiniBand operations to
manage connections. We adopt the end-point [14, 15]
model to implement connection management over Infini-
Band. An end-point pair should be established between
two nodes before their real data is exchanged. When
an RPC client wants to make a remote procedure call,
it should first exchange end-point information with the
RPC server. Due to the socket-based Hadoop RPC in-
terface, the RPCoIB client can use the socket address
to connect with the RPCoIB server and exchange the
end-point information to establish the connection. Af-
ter connection is established, the following communi-
cation goes through native IB. Due to our new design
of RDMAInputStream and RDMAOutputStream, we
need to modify setupIOStreams in Connection to
provide RDMA-based IO streams. By the Java IO stream
instances, the logic in Hadoop RPC client side can be
supported smoothly.
On the server side, our design enhancements include
the following. In Hadoop RPC (0.20.2 version), the server
side has three kinds of threads (In Hadoop 1.0.3, a new
Reader thread is introduced.). The Listener thread
is responsible for accepting new connections. As similar
as the changes in the Connection thread of the client
side, we need to modify the connection management op-
erations in Listener by IB operations. After an RDMA
connection is established, the connection will be pushed
into the connection pool. And we introduce the Reader
thread (just like in Hadoop 1.0.3) to poll incoming events
for each connection. If one call comes, Reader will read
the call and push it to the call processing queue. Another
type of thread in the server side is Handler, which is
responsible for handling client calls in the call queue and
invoking the target function. Responder is in charge
of sending back response. Due to our newly introduced
class (RDMAChannel), Handler and Responder can
almost keep their original logic. By above modifications,
the processing in the server side of Hadoop RPC can be
supported as well.
With this design, we can see that we do not need
any changes in existing RPC interfaces. We keep the
same interface semantics and mechanisms (e.g. exception
handling) with the default Hadoop RPC. Thus the upper
layer applications can transparently use our design. In
the following evaluation section, we can find that our
design can be easily integrated with HDFS, HBase, and
MapReduce framework in Hadoop. Besides, RPCoIB also
provides flexibility to applications. We introduce a pa-
rameter, rpc.ib.enabled, to make application choose
default RPC or our RPCoIB design. Based on our previ-
ous optimization experiences [15], we support a tunable
threshold to adaptively make very small messages go
through send/recv based communication over InfiniBand
while use RDMA for longer messages.
IV. PERFORMANCE EVALUATION
In this section, we present the detailed performance
evaluations of RPCoIB and its impact on MapReduce,
HDFS, and HBase.
A. Experiment Design and Setup
Broadly, there are two application directions of Hadoop
RPC. One is for big data analytics that uses HDFS and
MapReduce, and the other uses HDFS and HBase for
big data storage and database service. In this study, we
provide integrated evaluations along these two directions
and perform the following sets of experiments: (1) Micro-
benchmark [12] evaluations for RPCoIB. (2) Evaluations
of MapReduce benchmarks and application using RP-
CoIB. The benchmarks include RandomWriter and Sort,
while we choose CloudBurst [10] as the application.
CloudBurst is designed for highly sensitive short read
mapping with MapReduce. (3) Integrated evaluation of
HDFS operations using RPCoIB. (4) Integrated evaluation
646646646646646646
of HBase using RPCoIB. We use Yahoo! Cloud Serving
Benchmark (YCSB) [16] for HBase performance evalu-
ations. YCSB is a set of benchmarks for performance
evaluation of different cloud data serving systems.
We use two different clusters for our evaluations.
(1) Cluster A: Nodes in cluster A are equipped with
Intel Westmere series of processors using Xeon Dual
quad-core processor operating at 2.67 GHz. Each node
has 12 GB RAM and MT26428 QDR ConnectX HCAs
(32 Gbps) with PCI-Ex Gen2 interfaces that are connected
using a Mellanox QDR switch. Each node runs Red Hat
Enterprise Linux Server release 6.1 (Santiago) at kernel
version 2.6.32-131 with OpenFabrics version 1.5.3. In this
paper, we have used up to 65 nodes in this cluster.
(2) Cluster B: This cluster consists of 9 nodes, which
has the same configurations as Cluster A but with a larger
(24 GB) RAM and with an additional NetEffect NE020
10 Gb Accelerated Ethernet Adapter (iWARP RNIC) in
each node. These 10GigE cards are connected using a
Fulcrum Focalpoint switch.
The scale of Cluster B is smaller than Cluster A but it
has Infiniband and 10GigE. We conduct experiments on
Cluster B for micro-benchmark on latency and throughput
to compare performance with different networks. In other
cases, our experiments are conducted on Cluster A. We
use Hadoop 0.20.2 and HBase-0.90.3 versions in our
experiments.
B. Micro-benchmark Evaluations on RPCoIB
In this section, we conduct experiments to measure the
ping-pong latency and throughput of RPC communica-
tions. Due to space limitations, we only report the results
of payload sizes from 1 byte to 4 KB for latency tests
and for throughput tests, we keep 512-byte payload size
and 8 handlers in server fixed and vary the number of
concurrent clients from 8 to 64. The RPC server runs on
a separate node and the multiple clients for throughput
tests are distributed uniformly over 8 nodes.
Figure 5 shows the results of the latency and throughput
tests. From these figures, we can see that compared with
our proposed RPCoIB design, the default Hadoop RPC
over high-performance networks (10GigE and IPoIB) does
not achieve optimal performance. It is because when
we make RPC messages go through 10GigE or IPoIB
networks, the bottlenecks of memory allocations, copies,
and adjustments in Hadoop RPC default design suppress
the benefits from high-performance networks. However,
as shown in these figures, the RPCoIB design can shorten
the latency and improve the throughput significantly. As
shown in Figure 5(a), for a 1-byte payload size, the
RPCoIB latency is reduced to 39 us, while for a 4 KB
payload size, the latency is about 52 us. When the payload
sizes vary from 1 byte to 4 KB, RPCoIB can show 42%-
49% and 46%-50% improvements as compared with the
performance of default Hadoop RPC on 10GigE and IPoIB
(32 Gbps), respectively. If we compare the RPCoIB per-
formance with default Hadoop RPC running on low-speed
network (e.g. 1GigE), we can see that the RPCoIB design
has about 1.42-2.48x performance speedup. In order to
make the comparisons fairer, we just show the results on
high-performance networks and protocols, and we do not
show the numbers on 1GigE in the figures.
We also compare the throughput of RPCoIB and default
Hadoop RPC on both 10GigE and InfiniBand QDR as
shown in Figure 5(b). As we can see, the peak throughput
of RPCoIB is about 135.22 Kops/sec, which outperforms
the peak performance of default Hadoop RPC on 10GigE
by 82% and IPoIB by 64%, respectively. If we further
analyze the buffer management behavior in RPCoIB, we
can imagine that when we run the micro benchmark, only
the first call may need the buffer adjustment to learn how
much data size is appropriate for the call. All the following
invocations will get buffers with appropriate size based
on recorded history information. These results show that
RPCoIB eliminates the bottlenecks discussed in Section II.
C. Evaluations of MapReduce with RPCoIB
This section evaluates the performance of two bench-
marks, RandomWriter and Sort, using both RPCoIB and
default Hadoop RPC running over IPoIB. RandomWriter
executes a MapReduce job to generate random binary
data containing varied-sized key-value pairs in HDFS. The
generated data can be used by the Sort benchmark, which
runs a MapReduce job using multiple maps and reduces.
We conduct our experiments on 65 nodes, containing 1
master node and 64 slave nodes. The number of concurrent
maps and reduces per host is set to 8 and 4, respectively.
In this experiment, we vary the total data size from 32 GB
to 128 GB and measure the job execution time in each
case. For these set of experiments, we use two different
configurations: (a) default Hadoop running over IPoIB and
(b) Hadoop with RPCoIB design. In (a), RPC uses IPoIB,
while in (b) RPC goes through native IB.
Figure 6(a) shows that RPCoIB can improve the per-
formance of both RandomWriter and Sort benchmarks
for different data sizes. We also see that our design
has better performance for Sort benchmark with larger
data sizes. For example, for 64 GB, our design improves
RandomWriter performance by 9.1% over default Hadoop,
while it improves Sort performance by 12.3%. But for
128 GB, our design improves RandomWriter performance
by 12% over default Hadoop, while it improves Sort
performance by 15.2%. As discussed in Section II-A, the
Reduce phase is more RPC-intensive than the Map phase.
As RandomWriter is a map-only benchmark, performance
improvement of Sort is better than that of RandomWriter.
Also, with increase in data size, more number of maps
and reduces cause more RPC invocations which, in turn,
increase the performance improvement in RDMA-based
design.
We also run an example application, CloudBurst, for
MapReduce evaluation. Using the default data and default
configurations [10] of CloudBurst, we execute this appli-
cation over 9 nodes (1 master and 8 slaves). CloudBurst
generates two MapReduce jobs and executes them one by
one. The first job is Alignment, which is the major part
647647647647647647
30
40
50
60
70
80
90
100
4K1K 256 64 16 4 1
Late
ncy
(us)
Payload Size (Byte)
RPC-10GigERPC-IPoIB (32Gbps)
RPCoIB (32Gbps)
(a) Latency - Single Server, Single Client
0
50
100
150
200
64 56 48 40 32 24 16 8
Thr
ough
put (
Kop
s/se
c)
Number of Clients
RPC-10GigERPC-IPoIB(32Gbps)
RPCoIB(32Gbps)
(b) Throughput - Single Server, Multiple Clients
Figure 5. RPC Benchmark Evaluation on Cluster B with 10GigE and InfiniBand Networks
0
100
200
300
400
500
600
1286432
Job
Exe
cutio
n T
ime
(s)
Data Size (GB)
RandomWriter (IPoIB)RandomWriter (RPCoIB)Sort (IPoIB)Sort (RPCoIB)
(a) RandomWriter and Sort
0
50
100
150
200
TotalFilteringAlignment
Job
Exe
cutio
n T
ime
(s)
CloudBurst Job Phase
Hadoop (IPoIB)Hadoop (RPCoIB)
(b) CloudBurst
Figure 6. MapReduce Benchmark and Application Evaluation
of the whole application. In this test, it contains 240 maps
and 48 reduces in the Alignment stage. The second one is
Filtering, which is a small job. It contains 24 maps and 24
reduces. As shown in Figure 6(b), we can see that for the
larger job, our design achieves higher performance benefit,
that is, 10.7%. Overall, 10% performance improvement for
the CloudBurst application can be gained with our design.
D. Integrated Evaluations for HDFS
In this section, we evaluate the performance impact
of RPCoIB on HDFS. Earlier research [6] has proposed
an RDMA-based design of HDFS, which we denote by
HDFSoIB. HDFS Read is mostly node local [2] in MapRe-
duce. That is why, our evaluations focus on the HDFS
Write case. In this part, we integrate our RPCoIB design
with vanilla socket-based HDFS design as well as RDMA
based HDFS design and evaluate these on HDFS Write.
We use microbenchmarks for HDFS Write operations.
These experiments run on 32 DataNodes with one HDD
per node. The NameNode and the client run on two
other nodes of the Hadoop cluster. The HDFS replication
factor is set to three. We conduct these experiments over
different interconnects with four different configurations:
(a) Socket-based HDFS - Socket-based Hadoop RPC, (b)
Socket-based HDFS - RDMA-based RPC, (c) RDMA-
based HDFS - Socket-based Hadoop RPC, (d) RDMA-
based HDFS - RDMA-based RPC. Socket-based designs
are denoted as 1GigE and IPoIB, while RDMA based
designs are denoted as ‘HDFSoIB’ and ‘RPCoIB’.
Figure 7 illustrates HDFS Write results. Performance
0
10
20
30
40
50
60
54321
Tim
e (s
)
File Size (GB)
HDFS(1GigE)−RPC(1GigE)HDFS(1GigE)−RPCoIBHDFS(IPoIB)−RPC(IPoIB)HDFS(IPoIB)−RPCoIBHDFSoIB−RPC(1GigE)HDFSoIB−RPC(IPoIB)HDFSoIB−RPCoIB
Figure 7. Integrated Evaluation on HDFS Write
results indicate that our RPCoIB design integrated with
RDMA based HDFS (‘HDFSoIB-RPCoIB’) reduces the
latency of HDFS Write by about 10% over RDMA-based
HDFS with RPC run on IPoIB (‘HDFSoIB-RPC(IPoIB)’).
During file write, HDFS client communicates with the
NameNode for file metadata and also gets block related
information. DataNodes also send heart-beat messages as
well as block-reports to the NameNode. Each block is also
replicated to three DataNodes. Once a block is written to
a DataNode, a block-report is sent to the NameNode from
that DataNode. The NameNode is also notified when a
file write is completed. All these communications go over
RPC. Therefore, RDMA-based design of Hadoop RPC
helps improve the latency of HDFS Write.
E. Integrated Evaluations for HBase
HBase uses RPC for communicating all operation re-
quests, such as Put, Get, etc., and responses between
648648648648648648
client and HRegionServer. Earlier research had proposed
an RDMA-based design [7] for HBase Get and Put
operations. But the Hadoop level RPC still goes through
sockets. We evaluate the performance of HBase Get and
Put operations by integrating our earlier HBase design and
RPCoIB design. We use YCSB for our evaluations. We
vary the HBase record count from 100 K-300 K and each
record size is 1 KB. We use 640 K as the operation count.
These experiments run on a Hadoop cluster with 16 region
servers and 16 clients. The master node, HMaster, runs
on a separate node. We perform these experiments over
1GigE, IPoIB, and RDMA with three different configura-
tions such as (a) Socket-based HBase with Socket-based
Hadoop RPC, (b) RDMA-based HBase with Socket-based
Hadoop RPC, (c) RDMA-based HBase with RDMA-based
RPC. Configuration (a) is run over both 1GigE and IPoIB.
We compare HBase Get and Put latencies and throughputs
in each configuration.
Figure 8(a) shows that HBaseoIB-RPCoIB achieves an
improvement of 16% over HBaseoIB-RPC(IPoIB) in terms
of throughput for Put. The study in [17] shows that HBase
operations are network intensive. With the increase in
record count, more HDFS operations will be invoked.
This requires the DFSClient to communicate with the
NameNode for acquiring the block related information
over RPC. RPCoIB can also improve the performance of
HBase Get operations by 6%, as shown in Figure 8(b).
The percentage of improvement for HBase Get is less
compared to that of Put. This is because, for 100% Get,
less RPC communication is involved since it does not
require block-reports to be sent to the NameNode for each
block. Figure 8(c) illustrates that RPCoIB can achieve
an improvement of 24% over configuration HBaseoIB-
RPC(IPoIB) for the mix workload. Here, the interleaved
access pattern in the mix workload causes more data will
be written to and read from HDFS. Therefore, more HDFS
operations are invoked; that, in turn, increases the number
of RPC calls. We observe similar performance benefits for
latency. For space limitation, we skip these results here.
V. RELATED WORK
The RPC performance optimization is a hot topic in
distributed computing area. Many researchers have pro-
posed different mechanisms to optimize RPC performance.
Manta [18] uses a native compiler to generate specialized
serialization and uses an efficient RMI protocol to imple-
ment a flexible Java RPC. The differential serialization
and deserialization schemes [19, 20] have been proposed
to utilize the similarities in the incoming or outgoing mes-
sages to decrease the object serialization or deserialization
time in the XML based system. In our paper, we focus on
the Hadoop system and propose an efficient RPC design
over InfiniBand. Although targeting the data serialization
and deserialization stages, different with these systems,
our buffer management scheme bypasses JVM and utilizes
message size locality to avoid extra memory allocations
and copies.
Recently, a fair number of research directions have been
dedicated to improve the Hadoop system on the high
performance networks (such as 10 GigE and InfiniBand).
The Hadoop Acceleration [8] has focused on the data
shuffle and merge stages for Hadoop MapReduce on
InfiniBand. The recent research [21] has demonstrated
that there is still space for performance improvement in
Hadoop RPC compared with traditional HPC technologies,
such as MPI. In our previous research [22], we have
revealed the potential performance benefits for HDFS.
In [6, 7, 9], we have designed high performance RDMA-
based HDFS, HBase, and MapReduce over InfiniBand.
This paper complements these research directions from
RPC aspect. The new RPC design can make the whole
Hadoop system run faster over native IB. Our evaluations
show the extra performance improvements from RPCoIB
when we combine solutions from these research directions.
VI. CONCLUSION AND FUTURE WORK
In this paper, we investigate the performance bottle-
necks in Hadoop RPC design and reveal that it is necessary
to redesign both the data communication and the buffer
management of Hadoop RPC when running on InfiniBand.
We propose a high-performance design of Hadoop RPC
over InfiniBand, called RPCoIB. RPCoIB utilizes native
IB communication, JVM-bypassed buffer management,
and Message Size Locality to avoid buffer allocation
and memory copy overheads in the data serialization and
deserialization. Consequently, we observe 42%-49% and
46%-50% latency reductions as compared with default
Hadoop RPC over 10GigE and IPoIB QDR (32 Gbps),
respectively. The RPCoIB design also improves the peak
throughput by 82% and 64% as compared with the original
design over 10GigE and IPoIB. For MapReduce, our
RPCoIB design improves the performance of the Sort
benchmark on 128 GB data over 64 compute nodes by
15% as compared with IPoIB, while it improves the per-
formance of CloudBurst application by 10%. This paper
shows the integrated evaluations of our RPCoIB design
with optimization for other Hadoop components running
on InfiniBand, including HDFS over IB and HBase over
IB. Compared to their best performance, we have observed
additional 10% performance improvement for HDFS-IB
and 24% performance improvement for HBase-IB under
the mix workload. We have made a full software package
of Hadoop-RDMA [23] on InfiniBand available for down-
load. Hadoop-RDMA is a high-performance design of
Hadoop (consisting of HDFS, MapReduce and RPC) over
RDMA-enabled Interconnects. We plan to continuously
update this package with newer designs and carry out
evaluations with more Hadoop applications on systems
with high-performance networks.
REFERENCES
[1] Apache Hadoop, http://hadoop.apache.org/.
[2] J. Dean and S. Ghemawat, “MapReduce: Simplified
Data Processing on Large Clusters,” in Operating
Systems Design and Implementation (OSDI), 2004.
[3] T. White, Hadoop: The Definitive Guide. O’Reilly
Media, Inc., Oct. 2010.
649649649649649649
0
5
10
15
20
25
30
35
300K250K200K150K100K
Thr
ough
put (
Kop
s/se
c)
Record Count
HBase(1GigE)−RPC(1GigE)HBaseoIB−RPC(1GigE)HBase(IPoIB)−RPC(IPoIB)HBaseoIB−RPC(IPoIB)HBaseoIB−RPCoIB
(a) 100% Get
0
20
40
60
80
100
120
140
160
300K250K200K150K100K
Thr
ough
put (
Kop
s/se
c)
Record Count
HBase(1GigE)−RPC(1GigE)HBaseoIB−RPC(1GigE)HBase(IPoIB)−RPC(IPoIB)HBaseoIB−RPC(IPoIB)HBaseoIB−RPCoIB
(b) 100% Put
0
10
20
30
40
50
300K250K200K150K100K
Thr
ough
put (
Kop
s/se
c)
Record Count
HBase(1GigE)−RPC(1GigE)HBaseoIB−RPC(1GigE)HBase(IPoIB)−RPC(IPoIB)HBaseoIB−RPC(IPoIB)HBaseoIB−RPCoIB
(c) 50%-Get-50%-Put
Figure 8. HBase Throughput Evaluation
[4] K. Shvachko, H. Kuang, S. Radia, and R. Chansler,
“The Hadoop Distributed File System,” in IEEE 26th
Symposium on Mass Storage Systems and Technolo-
gies (MSST), 2010.
[5] Apache HBase, http://hbase.apache.org.
[6] N. S. Islam, M. W. Rahman, J. Jose, R. Rajachan-
drasekar, H. Wang, H. Subramoni, C. Murthy, and
D. K. Panda, “High Performance RDMA-based De-
sign of HDFS over InfiniBand,” in The International
Conference for High Performance Computing, Net-
working, Storage and Analysis (SC), November 2012.
[7] J. Huang, X. Ouyang, J. Jose, M. W. Rahman,
H. Wang, M. Luo, H. Subramoni, C. Murthy, and
D. K. Panda, “High-Performance Design of HBase
with RDMA over InfiniBand,” in IEEE Int’l Paral-
lel and Distributed Processing Symposium (IPDPS
2011), May 2011.
[8] Y. Wang, X. Que, W. Yu, D. Goldenberg, and
D. Sehgal, “Hadoop Acceleration Through Network
Levitated Merge,” in Proceedings of the International
Conference on High Performance Computing Net-
working, Storage and Analysis (SC’11), 2011.
[9] M. W. Rahman, N. S. Islam, X. Lu, J. Jose,
H. Subramoni, H. Wang, and D. K. Panda,
“High-Performance RDMA-based Design of Hadoop
MapReduce over InfiniBand,” in IEEE 27th Interna-
tional Parallel and Distributed Processing Sympo-
sium Workshops (IPDPSW), 2013.
[10] M. Schatz, “CloudBurst: Highly Sensitive
Short Read Mapping with MapReduce,”
http://sourceforge.net/apps/mediawiki/cloudburst-
bio/index.php?title=CloudBurst.
[11] The Apache Software Foundation, “The Apache
MINA Project,” http://mina.apache.org.
[12] X. Lu, M. W. Rahman, N. Islam, and D. K. Panda, “A
Micro-benchmark Suite for Evaluating Hadoop RPC
on High-Performance Networks,” in 3rd Workshop
on Big Data Benchmarking (WBDB), 2013.
[13] S. Ghemawat and P. Menage, “TCMalloc
: Thread-Caching Malloc,” http://goog-
perftools.sourceforge.net/doc/tcmalloc.html.
[14] J. Jose, M. Luo, S. Sur, and D. K. Panda, “Unifying
UPC and MPI Runtimes: Experience with MVA-
PICH,” in Fourth Conference on Partitioned Global
Address Space Programming Model (PGAS), Oct
2010.
[15] J. Jose, H. Subramoni, M. Luo, M. Zhang, J. Huang,
M. W. Rahman, N. S. Islam, X. Ouyang, H. Wang,
S. Sur, and D. K. Panda, “Memcached Design on
High Performance RDMA Capable Interconnects,”
in International Conference on Parallel Processing
(ICPP), Sept 2011.
[16] B. F. Cooper, A. Silberstein, E. Tam, R. Ramakr-
ishnan, and R. Sears, “Benchmarking Cloud Serving
Systems with YCSB,” in The Proceedings of the
ACM Symposium on Cloud Computing (SoCC 2010),
Indianapolis, Indiana, June 10-11 2010.
[17] M. W. Rahman, J. Huang, J. Jose, X. Ouyang,
H. Wang, N. Islam, H. Subramoni, C. Murthy, and
D. K. Panda, “Understanding the Communication
Characteristics in HBase: What are the Fundamen-
tal Bottlenecks?” in IEEE International Symposium
on Performance Analysis of Systems and Software
(ISPASS), April 2012.
[18] J. Maassen, R. V. Nieuwpoort, R. Veldema, H. Bal,
T. Kielmann, C. Jacobs, and R. Hofman, “Efficient
Java RMI for Parallel Programming,” ACM Trans-
action on Programming Languages and Systems,
vol. 23, no. 6, pp. 747–775, November 2001.
[19] N. A. Ghazaleh, M. J. Lewis, and M. Govindaraju,
“Differential Serialization for Optimized SOAP Per-
formance,” in Proceedings of the 13th International
Symposium on High-Performance Distributed Com-
puting (HPDC-13), 2004.
[20] N. A. Ghazaleh and M. J. Lewis, “Differential De-
serialization for Optimized SOAP Performance,” in
Proceedings of the International Conference on High
Performance Computing Networking, Storage and
Analysis (SC’05), 2005.
[21] X. Lu, B. Wang, L. Zha, and Z. Xu, “Can MPI Ben-
efit Hadoop and MapReduce Applications?” in IEEE
40th International Conference on Parallel Processing
Workshops (ICPPW), 2011.
[22] S. Sur, H. Wang, J. Huang, X. Ouyang, and D. K.
Panda, “Can High Performance Interconnects Benefit
Hadoop Distributed File System?” in Workshop on
Micro Architectural Support for Virtualization, Data
Center Computing, and Clouds, in Conjunction with
MICRO 2010, Atlanta, GA, December 5 2010.
[23] “Hadoop-RDMA: High-Performance Design of
Hadoop over RDMA-enabled Interconnects,”
http://hadoop-rdma.cse.ohio-state.edu.
650650650650650650