an advanced mapreduce: cloud mapreduce, enhancements and applications

15
IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014 101 An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications Devendra Dahiphale, Rutvik Karve, Athanasios V. Vasilakos, Huan Liu, Zhiwei Yu, Amit Chhajer, Jianmin Wang, and Chaokun Wang Abstract—Recently, Cloud Computing is attracting great at- tention due to its provision of configurable computing resources. MapReduce (MR) is a popular framework for data-intensive distributed computing of batch jobs. MapReduce suffers from the following drawbacks: 1. It is sequential in its processing of Map and Reduce Phases 2. Being cluster based, its scalability is relatively limited. 3. It does not support flexible pricing. 4. It does not support stream data processing. We describe Cloud MapReduce (CMR), which overcomes these limitations. Our results show that CMR is more efficient and runs faster than other implementations of the MR framework. In addition to this, we showcase how CMR can be further enhanced to: 1. Support stream data processing in addition to batch data by parallelizing the Map and Reduce phases through a pipelining model. 2. Support flexible pricing using Amazon Cloud’s spot instances and to deal with massive machine terminations caused by spot price fluctuations. 3. Improve throughput and speed-up processing over traditional MR by more than 30% for large data sets. 4. Provide added flexibility and scalability by leveraging features of the cloud computing model. Click-stream analysis, real-time multimedia processing, time-sensitive analysis and other stream processing applications can also be supported. Index Terms—Cloud computing, MapReduce, pipelining, stream processing, spot market. I. I NTRODUCTION J Effrey Dean and Sanjay Ghemawat [1] defined MapRe- duce (MR) as a programming model and an associated implementation for processing and generating large data sets. Cloud MapReduce (CMR) is an implementation of MapRe- duce framework on Amazon Web Services [2][3]. By using queues, CMR easily parallelizes the Map and the Shuffling stages. By using Amazon’s visibility timeout mechanism, it implements fault-tolerance. CMR is a fully distributed archi- tecture with no single point of failure and scalability bottle- neck as it exploits Amazon Web Service’s fully distributed features. Beyond using Amazon Web Services to simplify the implementation, this architecture is novel in many aspects as compared with the master/slave Hadoop [4]. It is faster than Manuscript received January 21, 2013; revised November 27, 2013 and February 24, 2014. The associate editor coordinating the review of this paper and approving it for publication was G. Martinez Perez. D. Dahiphale, R. Karve, and A. Chhajer are with the Pune Institute of Computer Technology, Pune, India (e-mail: {devendradahiphale, rutvikkarve, amitchhajer}@gmail.com). A. V. Vasilakos is with Kuwait University, Kuwait (e-mail: [email protected]). H. Liu is with Jamo (e-mail: [email protected]). Z. Yu, J. Wang, and C. Wang are with the School of Software, Tsinghua University, Beijing, China (e-mail: [email protected], {jimwang, chaokun}@tsinghua.edu.cn). Digital Object Identifier 10.1109/TNSM.2014.031714.130407 Hadoop and is very simple as well (3000 LoC, compared to Hadoop’s nearly 300K LoC). CMR is designed for processing batch data [5]. Nowadays, continuous streaming data constitutes an important portion of web data e.g. continuous click-streams, feeds, micro-blogging, stock quotes, weather forecasting related data, news etc. For processing such data with CMR, significant changes are required to be made to the basic architecture of CMR. To overcome this challenge, we introduce pipelining between Map and Reduce phases for supporting stream data processing, in addition to existing features of CMR - henceforth called Continuous Cloud MapReduce (C-CMR). CMR, by itself, is not well suited for spot market environ- ment to cope with massive machine terminations caused by spot price fluctuations when using spot instances on Amazon EC2 [2][3]. If MapReduce jobs are running on spot instances, and these instances are turning off and on due to fluctuations in prices of spot instances, then it leads to increase in the job completion time to a very great extent. The existing CMR architecture can be enhanced to tackle this issue of massive termination of spot instances. We will explain this extension, named Spot Cloud MapReduce (S-CMR), in subsequent sec- tions. Sections III, IV, V, and VI present our contribution.The paper is organized as follows: In section II, we begin with a literature survey of existing technologies and services which are related to our work. Section III describes and evaluates the architecture for the MapReduce programming model on top of Amazon Web Services. The techniques for detection and work-around for problems arising from eventual consistency are general enough that they can be applied to other systems implemented on top of a cloud platform. In section IV, we propose two enhancements to CMR archi- tecture to deal with issues mentioned in the second and third paragraph of the introduction section. Section IV A outlines the design and implementation of the first optimization and enhancement of CMR called Continuous Cloud MapReduce (C-CMR). The evaluation of Continuous Cloud MapReduce shows that the streaming, pipelined implementation provides a significant performance improvement over the batch mode of sequential processing. In section IV B, we showcase Spot Cloud MapReduce (S-CMR) to address the issue of massive machine terminations caused by a spot price increase. Spot market provides an ideal mechanism to leverage idle CPU resources and smooth out computation demands. We show experimentally that it performs well and it has very little overhead. Section V elaborates on challenges faced during 1932-4537/14/$31.00 c 2014 IEEE

Upload: chaokun

Post on 25-Dec-2016

215 views

Category:

Documents


1 download

TRANSCRIPT

Page 1: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014 101

An Advanced MapReduce:Cloud MapReduce, Enhancements and Applications

Devendra Dahiphale, Rutvik Karve, Athanasios V. Vasilakos, Huan Liu, Zhiwei Yu,Amit Chhajer, Jianmin Wang, and Chaokun Wang

Abstract—Recently, Cloud Computing is attracting great at-tention due to its provision of configurable computing resources.MapReduce (MR) is a popular framework for data-intensivedistributed computing of batch jobs. MapReduce suffers fromthe following drawbacks: 1. It is sequential in its processing ofMap and Reduce Phases 2. Being cluster based, its scalabilityis relatively limited. 3. It does not support flexible pricing. 4. Itdoes not support stream data processing. We describe CloudMapReduce (CMR), which overcomes these limitations. Ourresults show that CMR is more efficient and runs faster thanother implementations of the MR framework. In addition to this,we showcase how CMR can be further enhanced to: 1. Supportstream data processing in addition to batch data by parallelizingthe Map and Reduce phases through a pipelining model. 2.Support flexible pricing using Amazon Cloud’s spot instances andto deal with massive machine terminations caused by spot pricefluctuations. 3. Improve throughput and speed-up processingover traditional MR by more than 30% for large data sets. 4.Provide added flexibility and scalability by leveraging featuresof the cloud computing model. Click-stream analysis, real-timemultimedia processing, time-sensitive analysis and other streamprocessing applications can also be supported.

Index Terms—Cloud computing, MapReduce, pipelining,stream processing, spot market.

I. INTRODUCTION

JEffrey Dean and Sanjay Ghemawat [1] defined MapRe-duce (MR) as a programming model and an associated

implementation for processing and generating large data sets.Cloud MapReduce (CMR) is an implementation of MapRe-duce framework on Amazon Web Services [2][3]. By usingqueues, CMR easily parallelizes the Map and the Shufflingstages. By using Amazon’s visibility timeout mechanism, itimplements fault-tolerance. CMR is a fully distributed archi-tecture with no single point of failure and scalability bottle-neck as it exploits Amazon Web Service’s fully distributedfeatures. Beyond using Amazon Web Services to simplify theimplementation, this architecture is novel in many aspects ascompared with the master/slave Hadoop [4]. It is faster than

Manuscript received January 21, 2013; revised November 27, 2013 andFebruary 24, 2014. The associate editor coordinating the review of this paperand approving it for publication was G. Martinez Perez.

D. Dahiphale, R. Karve, and A. Chhajer are with the Pune Institute ofComputer Technology, Pune, India (e-mail: {devendradahiphale, rutvikkarve,amitchhajer}@gmail.com).

A. V. Vasilakos is with Kuwait University, Kuwait (e-mail:[email protected]).

H. Liu is with Jamo (e-mail: [email protected]).Z. Yu, J. Wang, and C. Wang are with the School of Software, Tsinghua

University, Beijing, China (e-mail: [email protected], {jimwang,chaokun}@tsinghua.edu.cn).

Digital Object Identifier 10.1109/TNSM.2014.031714.130407

Hadoop and is very simple as well (3000 LoC, compared toHadoop’s nearly 300K LoC).

CMR is designed for processing batch data [5]. Nowadays,continuous streaming data constitutes an important portion ofweb data e.g. continuous click-streams, feeds, micro-blogging,stock quotes, weather forecasting related data, news etc.For processing such data with CMR, significant changes arerequired to be made to the basic architecture of CMR. Toovercome this challenge, we introduce pipelining betweenMap and Reduce phases for supporting stream data processing,in addition to existing features of CMR - henceforth calledContinuous Cloud MapReduce (C-CMR).

CMR, by itself, is not well suited for spot market environ-ment to cope with massive machine terminations caused byspot price fluctuations when using spot instances on AmazonEC2 [2][3]. If MapReduce jobs are running on spot instances,and these instances are turning off and on due to fluctuationsin prices of spot instances, then it leads to increase in thejob completion time to a very great extent. The existing CMRarchitecture can be enhanced to tackle this issue of massivetermination of spot instances. We will explain this extension,named Spot Cloud MapReduce (S-CMR), in subsequent sec-tions. Sections III, IV, V, and VI present our contribution. Thepaper is organized as follows: In section II, we begin with aliterature survey of existing technologies and services whichare related to our work. Section III describes and evaluates thearchitecture for the MapReduce programming model on topof Amazon Web Services. The techniques for detection andwork-around for problems arising from eventual consistencyare general enough that they can be applied to other systemsimplemented on top of a cloud platform.

In section IV, we propose two enhancements to CMR archi-tecture to deal with issues mentioned in the second and thirdparagraph of the introduction section. Section IV A outlinesthe design and implementation of the first optimization andenhancement of CMR called Continuous Cloud MapReduce(C-CMR). The evaluation of Continuous Cloud MapReduceshows that the streaming, pipelined implementation providesa significant performance improvement over the batch modeof sequential processing. In section IV B, we showcase SpotCloud MapReduce (S-CMR) to address the issue of massivemachine terminations caused by a spot price increase. Spotmarket provides an ideal mechanism to leverage idle CPUresources and smooth out computation demands. We showexperimentally that it performs well and it has very littleoverhead. Section V elaborates on challenges faced during

1932-4537/14/$31.00 c© 2014 IEEE

Page 2: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

102 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

design and implementation, and solutions to these. Section VIgives an experimental evaluation of the three main componentsof the paper i.e. CMR, C-CMR and S-CMR. Lastly, weconclude in section VII, and also highlight on opportunities forfuture work. Our contribution is thus two-fold: 1. DevelopingC-CMR which has great functional and performance edge overconventional CMR and 2. Consolidating CMR, S-CMR and C-CMR into a unified framework for processing huge amountsof data in the cloud efficiently.

II. LITERATURE SURVEY

A. MapReduce

MapReduce [1] is a framework developed by Google forprocessing large data sets in a distributed fashion. It consists oftwo phases: a Map phase, in which the input data is split intochunks and distributed across multiple nodes to be processedupon by a user defined Map function, and a Reduce phasewhich is typically used to aggregate data produced by theMappers in the Map phase. A typical illustration is the wordcount example where the model can be used for counting thenumber of occurrences of each word in a data set. For e.g.given the data set ”chocolates chips chocolates fries chips friesfries fries”, The solution using MapReduce model proceedsas follows: 1. The document is divided into several chunks(ensuring that a whole word is not split on the boundary ofthe chunks). The reference to the split documents is kept inthe input storage in the form of multiple (key, value) pairs,where key is an ID given to a chunk and value is the pointerto that chunk. 2. There are several Mappers, and each Mapperis assigned an input split to process. Each Mapper processesassigned chunk according to a user defined Map function toproduce intermediate output in the form of key-value pairs. Inthis case Mappers’ output will be:

(chocolates, 1)(chips, 1)(chocolates, 1)(fries, 1)(chips, 1)(fries, 1)(fries, 1)(fries, 1)

Here, each intermediate pair indicates an occurrence of theword. The Map workers, in this example, will map each wordto the value “1”. After all Mappers finish processing, theReducers would take up this intermediate output and combineall values associated with a particular intermediate key. In thiscase the Reducers’ output will be:

(chocolates, 2)(chips, 2)(fries, 4)

This programming model has been implemented as a usableframework in Hadoop, Amazon Elastic MapReduce, WindowsAzure, Cloud MapReduce etc. Being a batch processingframework, it lacks support for processing stream data, snap-shots (the partial output of the job before job is completed,which is useful in cases where overall outcome of the job canbe predicted using intermediate output e.g. weather forecast;and the job may not need to run to completion), cascaded

MapReduce jobs and pipelining and parallelism of Map andReduce phases. Our contribution has been to provide these inCMR.

B. Online MapReduce (Hadoop Online Prototype)

MapReduce is a poor fit for interactive data analysis andreal-time data streams processing. For example, many appli-cations, such as Yang’s image indexing task [6] and distributedgrep jobs, require Map stage only. These applications requirean online approach for running MapReduce jobs continuously,while preserving Hadoop interfaces, APIs, and fault tolerance.In 2010, Condie et al. [7] proposed a modified MapReducearchitecture named Hadoop Online Prototype (HOP) thatallows data to be pipelined between operators, in order tosupport online aggregation and continuous queries. To achievepipelining, the Map task is modified to push data to Reducersas long as it finishes partial computation. The experimentalresults of HOP demonstrated that the proposed method pro-vides possibility for parallelism and improves computationalefficiency [8]. HOP is a modification of the traditional Hadoopimplementation that incorporates pipelining between the Mapand Reduce phases thereby supporting parallelism betweenthem, and provides support for processing stream data. HOPalso allows dynamic load balancing between Map and Reducephases, cascaded MapReduce jobs and support for snapshots.The output of Map workers is made available to the Reducersas soon as it is produced. It carries out online aggregationof data to produce incrementally correct output. Due toparallelization between Map and Reduce phases, it achievesapproximately 30% better throughput and response time. Thisalso supports snapshot of output data, by which user can viewthe output produced till some time instance instead of waitingfor finishing of entire job. HOP also supports cascaded [5]MapReduce jobs, in which output of one job is fed to theinput of another job. This leads the parallelism at job levelalso.

C. Cloud OS

A Cloud OS (Operating System) is a generic term used todescribe a system which is responsible for managing varioushardware resources, such as CPU, memory, disks, networkinterfaces - everything inside a cloud in order to providevarious services: 1) Compute Services such as Amazon EC2[2][3] and Windows Azure workers [9] provide computingpower in the form of Virtual Machines (VMs). 2) StorageServices such as Amazon S3 [2][3] and Windows Azure blob[9] storage provide storage through web services interfaces(REST, SOAP, and Bit Torrent). 3) Communication Servicessuch as Amazon’s Simple Queue Service (SQS) [2][3] andWindows Azure queue service [9] provide a highly scalablehosted message queue that connects between producer andconsumer. 4) Persistent Storage Services such as Amazon’sSimpleDB (simple database) [2][3] and Windows Azure tableservices [9] provide persistent storage in a distributed manner.

The challenges while designing with AWS (Amazon WebServices) include the consistency problem of SQS [3] [10],the limited throughput of SimpleDB [3] and the latency of S3[3]. CMR addresses these design issues most elegantly while

Page 3: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 103

leveraging AWS to reduce application complexity and achievehigh scalability.

III. CLOUD MAPREDUCE ARCHITECTURE AND

IMPLEMENTATION

In this section, we describe the design and implementationof Cloud MapReduce using Amazon Web Services. We startwith the high level architecture, then discuss implementationissues. The Word Count application is adopted as an exampleto describe the implementation.

CMR is a scalable, flexible, fast implementation of theMapReduce framework that allows programmers to exploit thebenefits of running big data processing jobs on a cloud plat-form. Cloud MapReduce uses Amazon EC2 [2][3] instancesfor spawning virtual instances for computation of MR jobs(local processor cycles can also be harvested to run the joblocally). Multiple EC2 instances can be used for both Mapand Reduce workers. Various cloud services are employed inCMR. The input data (the data to be processed) and outputof the job is stored in S3. It provides high data throughputsince data comes from multiple servers and communicationswith the servers potentially all traverse different networkpaths. SQS [2][3] is a critical component that allows CMRto be designed and implemented in a simple way. A queueserves two purposes. First, it is a synchronization point whereworkers (a process running on an instance) can coordinatejob assignments. Second, a queue serves as a decouplingmechanism to coordinate data flow between different stages.Lastly, SimpleDB[2] [3] is employed as the central job coor-dination point in CMR, a fully distributed implementation,which maintains all workers’ status, and is used for faulttolerance and conflict resolution between workers.

A. Architecture

Cloud MapReduce architecture is as shown in Fig. 1. Thereare several components involved in this architecture. They arelisted down and explained as follows.

1. Simple Storage Service (S3): It is storage for the internet.Amazon S3 [2][3] provides a simple web interface that can beused to store and retrieve any amount of data. This is used inCMR to provide input data to process before starting a CMRjob.

2. Input/Map queue: The input data, provided in S3, needsto be split into multiple chunks so that it can be processedby multiple Map workers simultaneously. A reference to eachchunk is kept in the input queues, which will be picked byone of the Mappers to process. Input queue is an instance ofSQS [2][3] provided by Amazon.

3. Simple database (simpleDB): This is a highly availableand flexible non-relational data store, which offloads the workof database administrator. SimpleDB [2][3] uses redundancyfor data durability and high availability. This component isused in the architecture for status tracking and fault tolerance.To facilitate tracking and fault tolerance, each worker updatesits progress to SimpleDB. This component of the system actsas a central point of co-ordination.

4. Map Workers: These are the threads, running on nodes ofEC2 [2][3], which apply user defined Map function on the data

to be processed. For each message from the input SQS queue,a Map thread is spawned for processing the data referencedby that message. All Map workers run in parallel with eachother.

5. Combiners: User has a facility to supply combiner func-tion, which is similar to reduce function. The threads whichapply combiner function on the Mappers’ output are calledCombiners. Combiners are typically used to perform Map sidepre-aggregation which reduces the amount of network trafficrequired between Map and Reduce phases. This optional partof the architecture is not shown in the figure. Combiners can beenabled or disabled with the ”-cb” switch from the commandline.

6. Reduce queues: These are several SQS queues. Thiscomponent of the architecture acts as an intermediate stagingarea for the job. Output of Map workers in kept in the Reducequeues. Records with same key are always added to the sameReduce queue, so that they will be available to the sameReduce worker to facilitate final aggregation. Hashing is usedto achieve this.

7. MasterReduceQueue: This queue is used to store the ref-erences to multiple Reduce queues. The number of messagesin MasterReduceQueue are equal to the number of Reducequeues as each message corresponds to one Reduce queue.

8. Reduce Workers: These are the threads which apply userdefined Reduce function to the intermediate data (output of theMap workers). The number of Reduce workers should be lessthan or equal to number of Reduce queues. Otherwise, someof the Reduce workers will be unnecessarily launched and willstay in idle state without any Reduce queue to process.

9. Output queue: This is also an SQS queue. Output queueis used to hold the output of the Reduce workers i.e. finaloutput of the job.

10. Upload and Download workers (not shown in archi-tecture diagram): Upload workers are threads which uploadoutput of the Map worker into intermediate staging area. EachMap worker has a configurable number of upload workersto upload the output from local storage to Reduce queues.Each Reduce worker has a configurable number of downloadworkers to download intermediate data from Reduce queues tolocal storage. The number of Map workers, Reduce workers,upload workers per Mapper, download workers per Reducer,and Reduce queues are user defined parameters, which arespecified as command line options while launching the job.Each CMR job requires a unique ID, which is also given bythe user.

B. Working of CMR

When user launches a MapReduce job, a list of files isextracted from given S3 location and the total size of theprovided data is calculated. The size of each split is calculatedusing number of mappers specified while launching the job.

for (S3Item item : fileList) {totalSize += item.getSize();

}long splitSize = totalSize / numSplits + 1;

Page 4: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

104 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

…….

…….

…….

Fig. 1. Cloud MapReduce architecture.

The input queue holds a list of S key-value pairs where eachpair corresponds to a split of the input data that will beprocessed by one Map worker.

do {// if input SQS not empty, dequeue, then mapfor (Message msg : inputQueue) {

String value = msg.getBody();// get the key for the key/value pair,

in our case, key is the mapIdint separator =

value.indexOf(Global.separator);int mapId = Integer.parseInt(

value.substring(0, separator));value = value.substring(

separator+Global.separator.length());// launch a mapper for each data chunkmapWorkers.push(new MapRunnable(map,

combiner, Integer.toString(mapId),value, perf, mapId,msg.getReceiptHandle()));

// only generate work when havecapacity, could sleep here

mapWorkers.waitForEmpty();}

// SQS appears to be empty, but really shouldmake sure we processed all in the queue

} while ( ! dbManager.isStageFinished(jobID,Global.STAGE.MAP) );

To facilitate tracking, each key-value pair also has a uniqueMap ID. In the word count application, this queue contains thedocument collection where the key is the document name andthe value is a pointer into S3 storage. SQS is mainly designedfor message communication; hence it has an 8KB messagesize limitation, which is enough for transferring messagesamong different workers but not well suited for holding largedocuments unless they are split into very small chunks. Toovercome this problem and have an optimal chunk size, a

pointer to a chunk in S3 is stored as value in a (key, value) pairinstead of storing the data directly in SQS. In addition to point-ing to the location in S3, the pointer could also contain a rangespecification, specifying a chunk of the file. Using ranges,the user could split up a bigger file into pieces and processthem separately. Similar to the input queue, the output queueholds the results of the MapReduce computation. In the wordcount application, the output holds the resulting key-valuepairs. The MasterReduceQueue queue holds many pointers,one for each Reduce queue. The MasterReduceQueue is usedfor assigning Reduce tasks. In the cluster of EC2 nodes, wherethe MapReduce job is running, each Map worker runs as aseparate thread on an EC2 instance polling the input queue forprocessing. When a Map worker dequeues one key-value pair,it invokes the user-defined Map function to compute the datareferenced by that key-value pair. Similar to other MapReduceimplementations, the user-defined function processes the inputkey-value pair and emits a set of intermediate key-value pairs.The input value is a pointer to a document stored in S3. TheMap worker thread first downloads the assigned chuck fromS3 to the local machine, and then applies user defined Mapfunction on that chunk of data. In case of word count example,the Map worker, for each word (e.g., ”the”) it sees in giveninput data, emits a key-value pair where the key is the worditself and the value is the number of occurrences of that wordSo, Mapper’s output after encountering word “the” will be(“the”, 1). It is the Combiners’ or Reducers’ responsibility toaggregate multiple records (key-value pairs) where the keysare same.

MapReduce programming model provides the basic func-tion of collecting intermediate key-value pairs from the Mapfunction, and writing them to the Reduce queues. A Reducekey maps to one of the Reduce queues through a hash function.A default hash function is provided, but the users could also

Page 5: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 105

make it customized. Since the number of Reduce keys couldbe much bigger than Q, several keys may map to the samequeue. As we will see, each Reduce queue is processed by aseparate Reduce worker; thus, Q should be set to at least aslarge as the number of Reduce workers. Preferably, Q shouldbe much bigger in order to maximize load balancing.

CMR uses the network to transfer intermediate key- valuepairs as soon as they are available, thus it overlaps datashuffling with Map processing. This is different from otherimplementations where the intermediate key-value pairs areonly copied after a Map task finishes. Overlapping shufflingis used when pipelining MapReduce [7]. Compared to theimplementation in [7] where it has to implement pairwisesocket connections and buffering/copying mechanism, theimplementation using queues is much simpler. Since the Mapphase is typically long, overlap shuffling has the effect ofspreading out traffic. This can help alleviate the incast problem[11] [12][13] (switch buffer overflow caused by simultaneoustransfer of a large amount of data) if it occurs.

The Reduce workers keep on polling simpleDB to find outwhether the Map phase is finished. Once the Map workersfinish their jobs, the Reduce workers start to poll work fromthe MasterReduceQueue. Each Reduce worker dequeues amessage (reference to a Reduce queue), and it is responsiblefor processing all data in the Reduce queue indicated by thatmessage. It dequeues messages from the Reduce queue andfeeds them into the user-defined Reduce function as an iterator.The Reduce function is written for aggregating all the records(key-value pair) having the same key.

do {// dequeue one reduce key , generate an

iterator of valuesfor (Message msg : masterReduceQueue) {String bucket = msg.getBody();reduceWorkers.push(new

ReduceRunnable(jobID, bucket, reduce,reduceCollector,msg.getReceiptHandle()));

// only generate work when have capacity,could sleep here

reduceWorkers.waitForEmpty();}

} while ( !dbManager.isStageFinished(jobID,Global.STAGE.REDUCE));

reduceWorkers.waitForFinish();

For the word count example, if a Reduce queue Q1 containsrecords: (“the”, 1,) (“apple”, 1) and (“the”, 1), then the outputof the Reduce worker, processing Q1 will be (“the”, 2) and(“apple”, 1). After the Reduce worker finishes processing alldata from the Reduce queue, the worker goes back to themaster Reduce queue to fetch the next message to process.

Besides reading from and writing to the various queues,the workers also read from and write to SimpleDB to readother workers’ status and update their status respectively.By communicating status with a central scalable SimpleDBservice, this approach not only avoids a single point bottleneckin the architecture, but also makes it fully distributed. Workersrun independently of all other workers. In addition, workerscan be heterogeneous. They can be located anywhere in thecluster of computing nodes and can have vastly different

computing capacities.Although figure 1 shows two sets of workers (Map workers

and Reduce workers) on different nodes, both could run on thesame set of computing nodes (e.g., EC2 instances). Reduceworkers start only after the Map phase has finished. A newworker can join the computation at any time. When it joins,it can determine whether the Map phase has finished byquerying SimpleDB, and it can then poll the input queue orthe MasterReduceQueue for work. The architecture makes itpossible for the job owner to get a rough sense of the jobprogress. The input queue length as a percentage of S (theoriginal input queue length) is a good approximation of theMap progress. Similarly, the master Reduce queue length asa percentage of Q (the original master Reduce queue length)is a good approximation of Reducers’ progress. We furtherenhance this by providing support for snapshots, as discussedin subsequent sections.

It is convenient to launch a MapReduce job since thenodes are symmetric. Users simply launch a certain numberof virtual machines from a custom Amazon Machine Image(AMI), and pass a few job specific parameters to the virtualmachines as user data. There is no complicated cluster setupand configuration, and there is no need for selecting a master.The AMI contains a simple script which parses the user datapassed in during launch to determine what application torun and which data set to use, and then the script starts theMapReduce job automatically.

IV. CMR-ENHANCEMENTS

In this section, we will introduce the two CMR enhance-ments named Continuous Cloud MapReduce (C-CMR) andSpot Cloud MapReduce (S-CMR). C-CMR, for processingstream data using pipelining, aims at bridging the gap be-tween heavy-weight HOP (Hadoop Online Prototype) andlight-weight, scalable CMR implementation by optimizingand enhancing CMR. We describe below, in Section A, anewly proposed and implemented architecture of the system,modeled after the CMR framework, incorporating changesrequired for processing streaming data and incremental onlineaggregation; and in Section B, Spot Cloud MapReduce whichis designed for better utilization of computing resources inthe spot market under the circumstances of massive nodeterminations due to price fluctuation.

A. Continuous Cloud MapReduce

In this section, we describe the design and implementationof a novel approach for optimizing and enhancing CMR usingpipeline between Map and Reduce phases. This ’PipelinedMapReduce’ [5] approach leads to increased parallelism be-tween the Map and Reduce phases. The architecture of Con-tinuous CMR is similar to that of CMR - with a few changesrequired to support the modified and added functionality. AsC-CMR is built on top of CMR, we use the same terminologyin the description that follows. We have also exploited thesolutions to challenges of designing with AWS solved byCMR, such as handling latency and scaling issues, handlingduplicate messages, creation of queues, failure recovery andresolving consistency issues.

Page 6: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

106 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

Fig. 2. Pipelined CMR architecture.

1) Input Message Format: In order to support future workrelated to rolling window support and to have a timestampfor snapshots, we introduce a timestamp field in the messageformat. Otherwise, the input message format is the same asthat in CMR i.e. in the form of a key-value pair, where keyis an ID and value is the reference to the chunk of data(to be processed), which is in S3.The new, modified inputmessage, with an 8KB message size limitation, is shown inFig. 2. For rolling window support, messages with a giventimestamp can be rounded off to the required granularity andstored in separate files with timestamps varying according tothe required granularity when processed by the Mappers. Amodule has been written for the purpose, but further workremains to be done.

2) Supporting Streaming Data at Input: In order to sup-port streaming, continuous data at the input, we introduce aStreamHandler thread. A list of files processed from S3 ismaintained. This thread continuously polls S3 and checks tosee if new files have been introduced as input by comparingthe filename with those in the list.

while(!Global.endCurrentJob) {ArrayList<S3Item> fileListToBeProcessed =

new ArrayList<S3Item>();// check for new files to processif(addDirToList(s3FileSystem.getItem(s3Path),

fileListToBeProcessed)){// total number of mappers launched so farGlobal.numSplit += numSplit;// add MapId, pointer to splits in input

queue

CreateAndAddSplitsToInputQ(fileListToBeProcessed);}

}

If a new file is found, it is split into chunks, pointers and atimestamp is added to the message, and the message is pushedinto the input SQS queue of the job, as in CMR. The formatof the message is shown in Fig. 2. This thread is made torun continuously throughout the job; however, an exponentialback-off strategy is used to modify the time interval betweenpolls in order to reduce the overhead of unnecessary pollingwhen no new input data is available.

3) Pipelining between Mappers and Reducers: In order toensure parallel, continuous operation for both Mappers andReducers, the blocks of code that realize the Mapper andReducer functions are implemented in separate threads. Asa result, the Reducers poll the Reduce queues even as theMappers are still in operation. This leads to parallel operationof both Map and Reduce workers. Here, the Reducers pick upthe intermediate results as soon as they are available from theMappers. The check in CMR that required the Reducers towait till all Mappers finished is no longer necessary and hasbeen removed.

Pipelining between Mappers and Reducers requires twoimportant modifications:

• Binding of Reduce Queues to Reducers: The Reducequeues must be statically bound to Reducers. This isrequired since a single Reducer must process all recordsassociated with the same key, as each Reducer has its ownoutput collector and it would not do to have intermediatemessages with the same key go to separate output splits.This is achieved by increasing the timeout intervals atwhich messages re-appear in the MasterReduceQueue, sothat a Reducer “holds on” to the same Reduce queue aslong as it has not failed. Failure detection and recoveryof workers is explained in section V (Challenges andSolutions).

• Merging new data with old results: Each Reducer mustintegrate the newly available intermediate messages withthe output that has been generated till current time.Thus, at each invocation of the Reduce function, theReducer must merge the current values of output, withthe newly available intermediate record by applying theReduce function. This was not necessary in CMR, asall the intermediate data were available at the start ofthe Reduce phase, making it a block operation. Withstreaming, new data needs to be constantly integratedwith the old. This is done by passing the Output Collectorto each invocation of the Reduce function, along with thenew values available in the Reduce queues. Filtering ofduplicates is inherited from CMR.

4) Addition of a “delete” method in the S3 Interface:The S3ItemImpl has been extended to include a method thatallows the user of the S3 interface to delete a file from S3,which is used in appending data to an S3 file, as S3 does notsupport the append operation. Intermediate and final results areappended as follows: read existing file, append data, deleteold file and upload modified file with the same name. Thisoperation makes the updates to the intermediate records used

Page 7: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 107

in RollingStore class, for implementing the rolling-windowfunctionality.

5) Snapshot: Snapshot functionality has been included inC-CMR in order to provide the user with a means to viewthe results of a MapReduce job run before the completion ofthe job. For example, for a 20 min. long job, the user couldrequest for a snapshot of the job results at time T=15min.We can visualize the output corresponding to the result of theMap and Reduce operations performed on the data that hasarrived at the input from the start of the job till T=15min.This allows the user to get an idea of the results beforehand,instead of waiting for the entire job to be completed. Also,for batch processing, if no more input data is detected, a“snapshot” is automatically invoked, writing the results to S3.The implementation involves pushing a “flag” file in S3 whena user requests a snapshot. The snapshot thread monitors theS3 location to detect the availability of this file. When thefile is detected, the thread deletes the file, reads the outputqueue to gather results generated, and writes the results tothe S3 bucket specified as the output location. This file canbe downloaded by the user to read the snapshot of the resultwhenever required. As the functionality is implemented as aseparate thread, it does not interfere with the normal operationof the MapReduce job, and occurs concurrently with it. Thisalso leads to network and processor parallelism as the datafetch and store stages occur concurrently with the processingstages, thus reducing bottlenecks.

6) Performance Tracking: A new JobProgressTracker classhas been included in the C-CMR implementation. An instanceof this class keeps track of the number of records processedby the Mappers and Reducers. Also, system time is noted atthe start of a job and a separate timer thread maintains timeduring execution of the job. The JobProgressTracker writesthe number of records processed by the Mappers and Re-ducers at regular intervals to S3 for both mapperProgressLogand reducerProgressLog respectively. These files can later bedownloaded and plotted in a 2D graph for showing the statusof the running job. Any graph charting tool such as XGraphcan be employed for plotting a graph from the ProgressLogfiles.

B. Spot Cloud MapReduce

Hadoop [4], a widely used implementation of MapReduce,does not fit well in the spot market environment. It is designedto tolerate infrequent machine failures rather than cope withmassive machine terminations caused by a spot price increase.In the worst case, no computation can progress if the primaryand backup master nodes fail. Even if the master nodes do notrun on spot instances, several simultaneous node terminationscould cause all replicas of a data item to be lost. In addition todata corruption, it is also indicated that adding spot instancesto a MapReduce computation can lengthen the computationtime [14].

In this section, we describe Spot Cloud MapReduce (SpotCMR), a MapReduce implementation tailored for a spotmarket environment. To the best of our knowledge, it is thefirst MapReduce implementation that could tolerate massivenode terminations induced by the price fluctuation in a spot

KV1 KV2 KV3 .

A Map split

User-defined

map function

Output collector

One KV output

Previous KV outputs

...

Reduce queues

Map task

ShutdownScript

Notify

Sleep

Temporary buffer Staging buffer

Fig. 3. Saving intermediate data when a node is terminated.

market. While processing a Map task, it streams intermediateresults to cloud storage as a preparation. When the instance isterminated, it employs the short time in the shutdown processto flush the buffer and commit the partial results. As a result,it achieves the function of making computation progress evenwhen many nodes terminate constantly.

Spot Cloud MapReduce is built on top of CMR. In order toapply it to spot market, the following four changes have beenmade.

First, the split message format in the input queues ismodified. For each split, besides the Map task ID m and thecorresponding file location, an offset f has been added forindicating the position in the file split where we should startprocessing. At the beginning of the job, when the input queueis created, initialized offset is 0.

The second change makes Spot CMR system save the inter-mediate work when a node is terminated. When a shutdownrequest is received, the shutdown scripts (e.g. /etc/rc0.d onsome Linux distributions) on the host OS are executed. Thesehave been modified. When they are invoked, they first issuea SIGTERM signal to the MapReduce process so that theMapReduce process can saves its states as necessary, then theygo to sleep, never executing the rest of the shutdown scripts.The shutdown window is too short for other MapReduceimplementations to save all intermediate data. However, it isenough for Spot CMR since it transfers data to the queues assoon as possible. Typically, a node has very little data left inits buffer that needs to be flushed.

Fig. 3 shows how Spot CMR processes a Map task. EachMap task is given a Map split, and the Map task parses theMap split and passes each key-value pair to the user-definedMap function in turn. In addition to passing an input key-value pair, the Map task also passes an output collector to theuser-defined Map function for it to write its output key-valuepairs to. Different from the CMR implementation, the outputfrom one input key-value pair is first saved in a temporarybuffer. When a Map worker finishes the processing of one key-value pair, its output is appended to a staging buffer which isthen asynchronously uploaded to the Reduce queues. The two

Page 8: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

108 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

buffer mechanism shown in Fig. 3 is a unique feature of theCMR implementation.

The third change made to CMR is to change the commitmechanism for a partial commit. The commit process isillustrated below:

1) Wait for the staging buffer to be successfully flushed.2) Send a commit message (n,m, fi, fi+1) to SimpleDB,

where n is the node id, m is the Map task split, fi is thebeginning offset when this Map task started, and fi+1 isthe offset when the Map process received the SIGTERMsignal. fi+1 indicates the position from where the nextMap task should resume processing. In Fig. 3, fi+1 = 3indicates that processing should resume from KV3.

3) Add a Map split message (m,Fm, fi+1) to the inputqueue, where m is the Map task ID, Fm is the originalinput split, and fi+1 is the resume offset.

4) Delete the Map split message (m,Fm, fi) from the inputqueue.

The last change is to propose a method for addressinga successful commit for a Map split. Since there could bemultiple partial commits, we must make sure that we can find aset of partial commits that cover the whole range. Once the setof partial commits are found, the Reduce workers employ themto filter valid intermediate results from the Reduce queues.

These modifications enable Spot CMR to process the Mapinputs at a finer key-value pair granularity instead of at thegranularity of an input split. Moreover, the boundary betweentwo consecutive Map processing of one input split is dynami-cally determined depending on when a node is terminated. Inthis implementation, there is no need to periodically save theresults and roll back when node fails [15], so the overhead ispretty low.

V. CHALLENGES AND SOLUTIONS

The challenges and solutions discussed below are applicablefor all the three main components of the paper i.e. CMR, C-CMR and S-CMR. We have to get around several limitationsposed by the cloud. We list the key challenges encounteredand the general techniques employed to resolve them. One ofthe contributions of this paper is on the general techniques toget around cloud’s challenges. We believe they could be usedfor other applications built on AWS. In subsequent sections,we get into more details on the implementation.

A. Long latency

Since cloud services are accessed through a network, thelatency could be significant. In our measurement, SQS latencyranges from 20 ms to 100 ms even from within EC2. Hence,a significant portion of the time will be spent waiting for SQSto respond. We tackle this limitation using two techniques:message aggregation and multi-threading.

Message aggregation is different from the combiner inthe MapReduce framework. A combiner is an applicationspecific function which reduces the intermediate result sizeby applying application specific logic. This is an optionalfunctionality provided in CMR (and inherited in C-CMR andS-CMR). Combiners work on the output of Map workersbefore uploading it to Reduce queues to minimize network

0

200

400

600

800

1000

0 5 10 15 20 25

# of upload threads

Tim

e (s

)

No combinerWith combiner

Fig. 4. Computation time as a function of the number of threads in thethread pool. Word Count on 25MB data on a single m1.small EC2 node.

traffic. In contrast, the proposed message aggregation is aframework implementation optimization that works regardlessof the application. As we store only key-value pairs in theSQS, the 8K SQS message size is much bigger than required.By aggregating these key-value pairs before transferring overthe network, it is possible to turn multiple round trips into one.This approach saves many queue read/write requests, whichhelps in hiding SQS latency.

To hide latency further, a thread pool of multiple threads isused for both writing to and reading from the Reduce queues.When a worker has a message to write, it pushes the messageinto a local queue. If a thread in the thread pool becomesidle, it pops off a new message from the local queue andsends the message to SQS synchronously. For reading fromthe Reduce queues, a read buffer is allocated and a read bufferthreshold is set. When the number of messages in the bufferfalls below the threshold, we ask idle threads to downloadadditional messages. Each idle thread performs one bulk readof 10 messages (10 is the maximum allowed by SQS API). Thethreads from the thread pool run concurrently with processingthreads from the system thereby utilizing idle resources (e.g.network) and hiding long latency.

Fig. 4 shows the time taken for the word count applicationas a function of the number of threads in the thread pool. Theword count application runs on a single m1.small EC2 instance(the smallest EC2 instance) and processes a 25 MB data set.We show both cases with the combiner enabled and disabled.When the combiner is disabled, more data is shuffled betweenthe Map and Reduce stages. As shown, the time quicklydecreases as we add more threads, suggesting that threads areeffective at hiding the latency. The latency from within EC2to SQS ranges from 20 ms to 100 ms. If computation nodesare further away from SQS, more threads are needed to hidethe latency. A user can specify the number of threads simplyby using a command line option. Since having more threadsin the thread pool has little impact on the performance, weinitialize 100 threads in the thread pool by default to supporta large amount of data transfer and to hide larger latency.

The message aggregation and multi-threading techniquesare only utilized on the Reduce queues since the input queueand the master Reduce queue serve a very different purpose.The Reduce queues are intermediary staging points betweenthe Map and Reduce phases, which require high throughput.By comparison, the input queue and the master Reduce queue

Page 9: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 109

are responsible for job assignments. It is better to read one ata time to ensure a more even workload distribution.

B. Failure detection and recovery

SQS’s message visibility timeout mechanism is used forfailure detection and recovery. When a worker reads a messagefrom a queue, the message disappears from the queue for acertain period of time called visibility timeout of that message.The visibility timeout of a message can be reset by any workerto an appropriate value as required.

• Map worker failure: The input queue holds messageswhich are used for Map task assignment. SQS queueshave a very short visibility timeout (default 30 seconds).If a Map worker cannot finish processing the assignedchunk of data within visibility timeout period, it keeps onresetting the visibility timeout of the associated messagei.e. a key-value pair. When a worker finishes with theassigned chunk, it removes an associated message fromthe queue. If the Map worker fails before finishing acomplete chunk of assigned data, the visibility timeoutof message will not be reset, and that message will re-appear in the queue, which then can be taken by anotherMap worker for processing. The Map workers committheir status to simpleDB only when they have finishedwith the assigned portion of work. Partial output by theMappers is simply discarded. We will see how duplicatemessages are identified and discarded in the next point.

• Reduce worker failure: The visibility timeout mechanismof SQS is also used for failure detection and recovery ofReduce worker. However, two Reduce workers process-ing same Reduce queue can pose a problem, as they willcause to produce duplicate messages in the final output ofthe job. If it happens, simpleDB can be used for conflictresolution between different Reduce workers processingthe same Reduce queue. We will discuss conflict resolu-tion in the next section i.e. duplicate message.

C. Duplicate message

Amazon’s documentation indicates that when a workerreads a message from an SQS queue, the message disappearsfrom the queue for a certain amount of time (the visibilitytimeout). In practice, two workers (or two threads) may readthe same message twice if they read at the same time. This isanother manifestation of eventual consistency because eachread modifies the message state, hiding it for a visibilitytimeout. Duplicate detection and conflict resolution techniquesare used to address this problem.

• Duplicate detection: When a Map worker writes an SQSmessage, it tags the message with three pieces of informa-tion: the worker ID, the Map ID and a unique number.The unique number is used to distinguish between themessages generated by the same Map worker whileprocessing the same Map ID. The tag is simply prependedto the message. When a Reduce worker reads an SQSmessage, it checks the tag to see if it has seen themessage before. If so, the Reduce worker ignores themessage; otherwise, it stores the tag in its database to

facilitate future duplicate detection and it then processesthe message. If two Map workers read a duplicate mes-sage from the input queue (or if a worker failed in themiddle of processing a Map task), there will be redundantMap outputs in the Reduce queues. The Reduce workersconsult SimpleDB to construct a list of committed workerID and Map ID pairs (randomly pick a winner if morethan one worker ID committed a Map ID), and they thenfilter out redundant messages by checking the worker IDand the Map ID in the message tag against the list.

• Conflict resolution: When more than one Reduce workershave processed messages from same Reduce queue, noneof them gets all the messages from that Reduce queue.A Reduce worker can find out how many messages arethere in a particular Reduce queue using informationcommitted in SimpleDB by Map workers (as each Mapworker commits the information in SimpleDB about howmany messages it has produced for a particular Reducequeue). When a Reduce worker finds that a Reduce queuej is empty, but the Reduce worker has not processed allkey-value pairs (fewer than

∑iRij , where Rij denotes

the number of key-value pairs the Map worker generatedfor each Reduce queue j while processing Map ID i), theReduce worker suspects that there may be a conflict, so itenters conflict resolution mode. It first writes the Reducequeue ID j and worker ID pair into SimpleDB, and itthen queries to see if other workers have claimed thesame Reduce queue ID. If so, it invokes a deterministicresolution algorithm (same for all nodes) to determinewho should be in charge of processing this Reduce queue.If the worker loses, it abandons what it has processed andmoves on. However, if the worker wins, it goes back toquery the Reduce queue again. Even if other workershave read some messages from the Reduce queue, themessages will reappear after the visibility timeout for thewinning worker to finish its processing.

D. Indeterministic eventual consistency windows

This problem has a different manifestation in SQS andSimpleDB. The workers find queue to be empty even whenthere are still messages in the queue. Amazon documentation[3] attributes this to the distributed nature of the SQS im-plementation, where messages for the same queue are storedon different servers. The workers randomly pick few servers(subset of all the servers on which the messages from thesame queue are stored), and consider it as a complete queuebut, it may be possible that the remaining messages are on theunselected servers, which are also a part of SQS. The Amazondocumentation states that one can call the dequeue API a fewtimes and the queue would return all messages (as there isa high probability that in multiple calls to dequeue, it willcover all the severs). Unfortunately, there is neither a boundon the number of API calls nor a bound on the time to wait.Similarly, in SimpleDB, when we read an item right after it iswritten, we may not get the latest value. One solution is to waitfor an arbitrarily long time, unfortunately it does not provideany guarantee, and can result into a much slower performancesince workers are frequently idle waiting. The solution strategy

Page 10: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

110 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

is to set an expectation before reading. For example, we recordthe number of key-value pairs generated by each Map taskfor each Reduce queue. Then, in the Reduce phase, we knowexactly how many key-value pairs to expect, and we poll fromthe Reduce queue until all are read. SimpleDB is also designedwith the same eventual consistency principle. Although, thereis not any concrete solution for this, CMR tackles it elegantly.We wait for the eventual results. At the end of Map stage, wemake sure all S Map results are committed before tallying upRij for the Reduce queue size, and we also make sure thatwe are tallying up S Rij (where S Rij represents the totalnumber of key-value pairs generated for each Reduce queuej i.e. size of Reduce queue j) counts, one for each committedMap task. Similarly, at the end of the Reduce stage, we makesure all Q Reduce results are committed.

E. Horizontal scaling

Although all Amazon cloud services are based on horizontalscaling, there seems to be only one concrete manifestation:when using SimpleDB, each SimpleDB domain is only ableto sustain a small write throughput. The threshold is roughly30-40 items per second. To get around this problem, the writeworkload is spread across many domains, and aggregation ofthe results from all domains is done when reading the status.Unlike SimpleDB, other services, such as S3 and SQS, hidethe horizontal scaling details from the end users.

To overcome the write throughput limitation of a singleSimpleDB domain, each worker randomly picks one of severaldomains to write the status. When querying SimpleDB forresults, each worker launches multiple threads to read fromall domains at the same time, and then aggregates the overallresult. We find that 50 domains are enough even for our testcases with 1000 nodes. Even though statuses are maintainedcentrally, SimpleDB would not be a bottleneck since it itselfis implemented in a distributed fashion.

VI. EVALUATION

The following evaluation considers three aspects: CloudMapReduce, Spot Cloud MapReduce and Continuous CloudMapReduce.

A. Cloud MapReduce

Three different MapReduce applications have been adoptedto evaluate Cloud MapReduce’s performance: Word Count,Reverse Index, and String Matching (Distributed Grep).Hadoop is deployed in EC2 cluster, instead of using Amazon’sElastic MapReduce. Word Count and Distributed Grep utilizethe examples provided by the Hadoop distribution. ReverseIndex has been implemented, since it is not included in theHadoop distribution.

Word Count application is run on a roughly 13GB ofWikipedia article text file. Using the default 64MB block sizein HDFS (Hadoop File System), there are exactly 200 splits,which corresponds to 200 Map tasks. For CMR, the data isstored in S3 as several files since S3 has a 5GB per file limit,and file ranges have been used to create an equal numberof splits (200) as inputs to the CMR jobs. Using the default

TABLE ITIME (IN SECONDS) TO RUN THE WORD COUNT APPLICATION. ROUGHLY

13GB DATA ON 100 NODES.

combiner no combiner

Hadoop 264 537Cloud MapReduce 104 372Cloud MapReduce w/sort 221 463

0%

20%

40%

60%

80%

100%

Prog

ress

Map progress

Reduce progress

0%

20%

40%

60%

80%

100%

0 100 200 300 400 500 600

Prog

ress

Time (seconds)

Map progress

Reduce progress

(a) Hadoop progress (pipelined)

20.00%

40.00%

60.00%

80.00%

100.00%

Prog

ress Map progress

Reduce progress

0.00%

20.00%

40.00%

60.00%

80.00%

100.00%

0 100 200 300 400

Prog

ress

Time (seconds)

Map progress

Reduce progress

(b) Cloud MapReduce progress (blocking)

Fig. 5. Computation progress for Hadoop and CMR. Word count on roughly13GB data on 100 nodes. No combiner.

2 Map tasks per node setting, all Map tasks finish in thefirst wave. The number of Reduce tasks are set to be 100,so that they also finish in one wave. To see the effects oflarger data, we run the test with and without the combinerenabled. To enable side-by- side comparison, a version ofCloud MapReduce with the pull iterator interface implementedwith in-memory sorting is also run. Table I shows the timeit takes to run the MapReduce jobs. In both cases, CloudMapReduce is roughly twice as fast as Hadoop. Even withsorting, Cloud MapReduce is still faster.

Fig. 5 shows two of the data points in Table I in more detail.Fig. 5(a) shows Hadoop’s computation progress for the casewith no combiner. Hadoop’s report on the Reduce progressconsists of three components. The first (0-33%) is for datashuffling, the second (33-66%) is for data sorting and the third(66-100%) is for applying the user-defined Reduce function.At 125s, some Map tasks have finished, and Hadoop startsto shuffle data. At 231s, the Map stage finishes. However,there are still some intermediate data that needs to be shuffled.Shuffling continues until 385s when the Reduce progressreaches 33%. While data is shuffling between 231s and 385s,the CPU is underutilized.

In comparison, Fig. 5(b) shows CMR’s computationprogress. The Map tasks finish at 210s, and the Reduce tasksalmost start processing immediately. In CMR, the reduceprogress report only tracks the progress on applying the user-defined Reduce function since data is already loaded into SQS

Page 11: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 111

(a) CPU Usage

(b) Network Usage

Fig. 6. Word count on a 200MB data set on an m1.small instance.

when the Reduce stage starts. Since data uploading happensin parallel with Map computation, CMR makes more efficientuse of both CPU and network IO. In addition, CMR employsmany parallel threads for SQS upload, which helps to increasethe throughput.

In the next experiment, a larger block size is used toobserve its impact on the run time. We use a roughly 52GBof Wikipedia article text file, and with a 256MB block sizein HDFS so that there are again exactly 200 splits. Withcombiner, CMR takes 436s, whereas Hadoop takes 747s.Without combiner, CMR takes 1,247s, whereas Hadoop takes1733s. Even though the Map stage is longer due to the largerblock size, the shuffling phase is also longer in Hadoop, whichtranslates into a smaller CMR run time.

Fig. 6 presents a different view on the overlapping betweenmap and shuffling. It shows the CPU and memory usageduring one run of the Word Count application on a singlem1.small instance processing 200MB of data. Combiner isdisabled in order to stress the network. The CPU remainsmostly at peak utilization throughout the job ( 40% is thehighest utilization on m1.small). In Fig. 6(a), at around 21:42,the Map phase finishes and the worker waits to flush outall SQS messages before starting the Reduce phase. Whilewaiting, there is a short drop in the CPU utilization, but theReduce stage starts soon after, making full use of the CPUagain. Fig. 6(b) shows the network IO happening in parallel,including both downloading files from S3 and accessing SQS.Because network access is spread out, the network bandwidthdemand is small, staying under 60Mbps, even with the com-biner disabled. In our independent tests, an EC2 instance isable to sustain roughly 800Mbps throughput, so the networkinterface is far from being the bottleneck.

Word Count example is run on a much larger data set,which has 100GB data and 1,563 splits when stored in HDFSusing the default block size. Table II shows the results ofprocessing using 100 nodes. Since there are many Map tasks,the shuffling of the earlier Map output would overlap with laterMap processing; thus, the overlapping advantage of CMR isnot as pronounced. However, removing sorting and avoidingdisk staging still result in significant reduction. We are notable to run the sorting version of CMR due to memory limit.

TABLE IITIME (IN SECONDS) TO RUN THE WORD COUNT APPLICATION. 100GB

DATA ON 100 NODES.

combiner no combiner

Hadoop 2058 4379Cloud MapReduce 1324 3213

For the Distributed Grep application, the same 13GB datais used as in the Word Count example and we grep forthe keyword “which”. Cloud MapReduce takes 962 seconds,whereas, Hadoop takes 1,047 seconds. Adding sorting orcombiner makes little difference since the amount of data inthe Reduce stage is small. The time difference is not as muchbecause this job is dominated by string (regular expression)matching in the Map phase, which is CPU intensive. Also,the Map output data is small for the Reduce stage; thus, theeffects of overlapping map and shuffling is not as pronounced.

As to the Reverse Index application, the same 1.2GB datais used as in Phoenix evaluation [16]. The input data isduplicated 10 times to create 12GB of data so that it isreasonably large for a 100-node cluster. The resulting dataset contains 923,670 HTML files. With the default settings, ittakes Hadoop more than 6 hours to process all data, whereasCMR only takes 297s. Most of the long running time is due toHadoop’s high overhead of task creation, where sometimes ittakes a few hundred milliseconds to create a new task. Sinceeach input file is a separate Map task, the overhead adds up. Toget around the overhead, MultiFileInputFormat is usedas the input format, which aggregates many small files to bean input split. Using MultiFileInputFormat, Hadoop’scomputing time reduces to 638 seconds, more than twice thatof CMR.

B. Spot Cloud MapReduce

Spot CMR’s performance is evaluated on Amazon EC2instances. A comprehensive evaluation is beyond the scopeof the paper. Instead, we show that Spot CMR can work wellin the spot market environment and that it can significantlyreduce cost by leveraging spot pricing. Since the current SpotCMR cannot tolerate many terminations in the Reduce phase,only spot instances are used in the Map phase, and regularinstances in the Reduce phase. The Word Count applicationon a 50GB of crawled web pages data using 20 m1.smallinstances is run. The input data is split into 100 splits of500MB each. Spot CMR parses the input files, and passesthe line number as the key and the line content as the valueto the user-defined Map function. We simulate a spot marketscenario where the price increases every 20 minutes, thendrops right away. As a result, all 20 instances are killedevery 20 minutes, and they are then restarted right away. SpotCMR takes 93.4 minutes to complete the job. In contrast,if we assume the instances are never turned off (e.g. whenthe bid price is much higher), CMR takes 84.2 minutes tocomplete. Spot CMR introduces a slight overhead when nodesare constantly terminated, which includes the time taken: 1)for a Linux OS to boot, 2) for Spot CMR to start and figureout the current progress, and 3) for spot CMR to save theprogress when terminated. Spot MapReduce stores more data

Page 12: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

112 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

Fig. 7. Spot price history in Amazon EC2 us-east data center in Sep. 2010.

(e.g., more commits) in SimpleDB, which could cause a higherquery overhead. With no terminations, CMR consumes 0.012hours of SimpleDB CPU time as reported by Amazon billingstatement. In comparison, Spot CMR with a failure every 20minutes consumes 0.013 hours of SimpleDB CPU. Most ofthe time is consumed in the Reduce phase when the Reduceworkers are querying SimpleDB and are trying to determineif there are enough commits for all Map tasks.

Hadoop 0.21.0 is also compared on the same input data.Assuming no failure, Hadoop took about 119.6 minutes tofinish. We also compare with the failure cases. Unfortunately,one cannot terminate a large number of nodes in Hadoop, notonly because a master node failure is catastrophic, but alsobecause one cannot afford losing all replicas of a data item.Instead, 3 instances are randomly terminated and restartedafter every 10 minutes. In this case, Hadoop took 187.5minutes to finish. As observed in [14], Hadoop’s performanceis significantly impacted in such a dynamic environment.

Now consider the potential cost savings by using the spotmarket. The real spot price history is chosen in a period oftime between Sep. 9th 1pm and Sep. 10th 2010 in Amazon’sus- east data center, and then simulate and replay the pricechanges by manually turn off nodes when necessary. Thisperiod was selected for experimentation because the pricefluctuates widely, going from the lowest $0.029 to the highestof $0.085. It is shown in Fig. 7.

For Spot CMR, we bid at the lowest price of $0.029. Thetotal time, including the time waiting for the price to drop, is22.4 hours because most of the time is spent waiting. However,it only costs $1.16. We can also use Hadoop in the spot market,but bid at a much higher price to avoid termination in thatperiod. We find that this approach would cost $2.60, but itcomes with a much quicker response time of 119.6 minutes.If we use regular instances for Hadoop, the cost would be $3.4.Clearly, Spot CMR costs much less to process a MapReducejob than Hadoop, although the user must be willing to shiftthe demands at the cost of possibly waiting for a long time.

C. Continuous Cloud MapReduce

We ran C-CMR and CMR on EC2 small and mediuminstances and achieved significant performance improvements

Fig. 8. Time cost of blocking (CMR)

Fig. 9. Time cost of pipelining (C-CMR).

(reduction in response time and increase in throughput).As to the data sets larger than 250MB, for the word count

application, we achieved a speedup above 30%. Varying thenumber of Mapper, Reducer threads and number and sizeof splits varies the results. As shown in Table III, for verysmall data sets, the increased number of threads in C-CMRas compared to CMR leads to slightly degraded performance.For large data-sets, job setup time and the thread overhead isamortized, and the benefits far outbalance the drawbacks. Theresults are summarized in the graphs and charts below:

Fig. 8 depicts the progress of Map and Reduce workers inCMR where initially, only Map workers are making progress(Map phase). After Map phase is finished, Reduce workers (inReduce phase) start processing after an initial delay requiredto download data from Reduce queues. Using 10 Mappersand 10 Reducers to process 250MB of data (word countapplication) on 2 m1.small instances of Amazon EC2, it takes1045 seconds. As shown in Fig. 10, in the pipelined imple-

Page 13: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 113

TABLE IIISPEEDUP INCREASE WITH INPUT DATA SIZE

Mappers Reducers Data Size (MB) Original CMR (Sec) Pipelined CMR (Sec) Speed up (%)5 5 00.021 50 52 −42 2 10 73 315 6.8

10 10 50 315 243 22.810 10 100 420 305 27.3910 10 250 1045 706 32.44

Fig. 10. Time cost comparison between CMR and C-CMR.

mentation, both Mappers and Reducers are able to progresssimultaneously, leading to reduced job completion time. Withthe same configuration as in CMR for processing same sizeof data, C-CMR only takes 706 seconds. Comparing Fig. 8and Fig. 10, in pipelined approach (C-CMR), we can observea speed of approximately 33% over CMR.

Thus, as seen above, the streaming, pipelined implementa-tion provides a significant performance improvement over thebatch mode of sequential processing. In addition, it providesthe following advantages:

1) Support for streaming, continuous data.2) Support for snapshots by enabling user to request output

at any time without waiting for the job to complete,which allows earlier approximation of the output.

3) Pipelined implementation, leading to reduced job com-pletion time and increased throughput.

4) Support for new applications that require streaming,such as multimedia, stock-quote processing, weatherforecasting, click-stream and twitter feed analysis, etc.

5) Support for cascaded Map-Reduce jobs, which facili-tates parallelism at job level also i.e. parallelism betweenMap and Reduce phases leads to parallelism betweenmultiple jobs e.g. two interdependent queries can be runin parallel.

VII. CONCLUSION AND FUTURE WORK

In order build a large-scale system on top of AWS, weproposed a new fully distributed architecture, namely CMR,to implement the MapReduce programming model. CMR isan important approach to developing processing frameworksusing cloud services. Nodes pull for job assignments andglobal status in order to determine their individual actions.The architecture employs queues to shuffle results from Mapto Reduce. Mappers write results as soon as they are availableand Reducers filter out results from failed nodes, as well asduplicate results. Our preliminary results indicate that CMR is

a practical system and its performance is superior to Hadoop.Cloud MapReduce has several highly desirable properties,which can be shared by other highly-scalable systems (suchas Dynamo[17]).

Incremental scalability: Cloud MapReduce can scale incre-mentally in the number of computing nodes. A user not onlycan launch a number of servers at the beginning, but can alsolaunch additional servers in the middle of a computation ifthe user thinks the progress is too slow. The new servers canautomatically figure out the current job progress and poll thequeues for work to process.

Symmetry and Decentralization: Every computing node inCloud MapReduce has the same set of responsibilities as itspeers. There are no master or slave nodes. Symmetry simplifiessystem provisioning, configuration and failure recovery. Asimplied by symmetry, there is no single central agent (master),which makes the system more available.

Heterogeneity: The computing nodes could have varyingcomputation capacity. The faster nodes would do more workthan the slower nodes. In addition, the computing nodes couldbe distributed geographically. In the extreme, a user caneven harvest idle computing capacity from servers/desktopsdistributed on the Internet.

One drawback of the current CMR implementation is that itdoes not employ any locality optimization. It uses the networkexclusively for I/O, bypassing all local storage. Such anarchitecture would encounter network bottleneck eventually intoday’s cloud infrastructure, when the network links betweenthe computing nodes and the cloud services are saturated. Thelack of locality optimization is not only because the cloudservices run on a separate set of nodes than our Map andReduce workers, but also because Amazon cloud does notexpose any locality hint. Porting CMR to run on an internalcloud (hosted in an enterprise’s data center) will supportlocality optimization by allowing queues and storage serviceto co-locate on the same nodes as the computing nodes, andit exposes locality hint such that it is able to optimize dataplacement. In the future, locality optimization may no longerbe necessary. Future data center architecture (e.g. fat tree [18],Portland [19], Bcube [20], Dcell [21] etc.) would support thefull bi-sectional bandwidth, removing network as a bottleneck.

We described the first MapReduce implementation thatcould continue making progress even when many nodesterminate at the same time in a spot market environment.By adopting a spot market and Spot Cloud MapReduce, aninfrastructure cloud provider could shift computation demandsto increase the overall utilization of its infrastructure. SinceMapReduce jobs represent a large proportion of batch jobs,the amount of demands that we can shift, hence the degree of

Page 14: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

114 IEEE TRANSACTIONS ON NETWORK AND SERVICE MANAGEMENT, VOL. 11, NO. 1, MARCH 2014

increase in utilization, could be significant.We have succeeded in further improving on the work of

CMR to provide added functionality and improved perfor-mance by employing pipelining between the Map and Reducephases. In C-CMR, we have also incorporated support forstream data processing, snapshots and cascaded MR jobs inCMR. This is an important improvement in the context ofcurrent data processing needs, as evidenced by recent streamprocessing applications. The Rolling Window functionalitythat has been partially implemented in C-CMR needs to bemade fully functional with a parallel, concurrent module foraggregating the results produced by the Mappers according tothe time-window provided by the user. This will provide trueflexibility to the user to view the Mapper output of any giventime-window of the input stream.

Overall, Cloud Map Reduce and the extensions designed,implemented and summarized here are an important step inmoving distributed computing frameworks such as MapRe-duce to cloud computing architectures. These implementationsprovide solutions to many generic problems which may be re-used by other models. The results and performance improve-ments are significant and promising. Cloud Computing is fastgaining attention in the industry, and the methods mentionedabove will add significant benefit to novel applications of thesame.

ACKNOWLEDGMENT

We are grateful to inventors of MapReduce who fueled ourresearch into this area. Our special thanks to our respectiveinstitutions and guides for encouraging us to pursue researchinto this area.

REFERENCES

[1] J. Dean and S. Ghemawat, “MapReduce: simplified data processing onlarge clusters,” in OSDI, 2004, pp. 137–150.

[2] H. Liu and D. Orban, “Cloud mapreduce: a mapreduce implementationon top of a cloud operating system,” in Proc. 2011 IEEE InternationalSymposium on Cluster Computing and the Grid, vol. 0, pp. 464–474.

[3] “Amazon products and services.” Available: http://aws.amazon.com/products/

[4] “Hadoop.” Available: http://hadoop.apache.org/[5] R. Karve, D. Dahiphale, and A. Chhajer, “Optimizing cloud mapreduce

for processing stream data using pipelining,” in EMS, 2011, pp. 344–349.

[6] Z. Yang, S. Kamata, and A. Ahrary, “NIR: content based image retrievalon cloud computing,” in Proc. 2009 IEEE International Conference onIntelligent Computing and Intelligent Systems, vol. 3, pp. 556–559.

[7] T. Condie, N. Conway, P. Alvaro, J. M. Hellerstein, K. Elmeleegy, andR. Sears, “MapReduce,” in NSDI, 2010, pp. 313–328.

[8] Z. Yu, C. Wang, C. D. Thomborson, J. Wang, S. Lian, and A. V.Vasilakos, “Multimedia applications and security in mapreduce: oppor-tunities and challenges,” Concurrency and Computation: Practice andExperience, vol. 24, no. 17, pp. 2083–2101, 2012.

[9] “Windows azure services.” Available: http://www.windowsazure.com/en-us/documentation/

[10] W. Vogels, “Eventually consistent,” Commun. ACM, vol. 52, no. 1, pp.40–44, 2009.

[11] A. Phanishayee, E. Krevat, V. Vasudevan, D. G. Andersen, G. R.Ganger, G. A. Gibson, and S. Seshan, “Measurement and analysis of tcpthroughput collapse in cluster-based storage systems,” in FAST, 2008,pp. 175–188.

[12] V. Vasudevan, A. Phanishayee, H. Shah, E. Krevat, D. G. Andersen,G. R. Ganger, G. A. Gibson, and B. Mueller, “Safe and effective fine-grained tcp retransmissions for datacenter communication,” in Proc.2009 SIGCOMM, pp. 303–314.

[13] Y. Chen, R. Griffith, J. Liu, R. H. Katz, and A. D. Joseph, “Understand-ing TCP incast throughput collapse in datacenter networks,” in WREN,2009, pp. 73–82.

[14] N. Chohan, C. Castillo, M. Spreitzer, M. Steinder, A. Tantawi, andC. Krintz, “See spot run: using spot instances for mapreduce workflows,”in Proc. 2010 USENIX Conference on Hot Topics in Cloud Computing,ser. HotCloud’10. USENIX Association, 2010, pp. 7–7. Available:http://dl.acm.org/citation.cfm?id=1863103.1863110

[15] S. Yi, D. Kondo, and A. Andrzejak, “Reducing costs of spot instancesvia checkpointing in the amazon elastic compute cloud,” in IEEECLOUD, 2010, pp. 236–243.

[16] C. Ranger, R. Raghuraman, A. Penmetsa, G. R. Bradski, andC. Kozyrakis, “Evaluating mapreduce for multi-core and multiprocessorsystems,” in HPCA, 2007, pp. 13–24.

[17] R. Cytron and V. Bala, “Dynamo: a transparent, dynamic, native binaryoptimizer (panel session),” in Dynamo, 2000, p. 75.

[18] M. Al-Fares, A. Loukissas, and A. Vahdat, “A scalable, commodity datacenter network architecture,” in Proc. 2008 SIGCOMM, pp. 63–74.

[19] R. N. Mysore, A. Pamboris, N. Farrington, N. Huang, P. Miri, S. Rad-hakrishnan, V. Subramanya, and A. Vahdat, “Portland: a scalable fault-tolerant layer 2 data center network fabric,” in Proc. 2009 SIGCOMM,pp. 39–50.

[20] C. Guo, G. Lu, D. Li, H. Wu, X. Zhang, Y. Shi, C. Tian, Y. Zhang, andS. Lu, “Bcube: a high performance, server-centric network architecturefor modular data centers,” in Proc. 2009 SIGCOMM, pp. 63–74.

[21] C. Guo, H. Wu, K. Tan, L. Shi, Y. Zhang, and S. Lu, “Dcell: a scalableand fault-tolerant network structure for data centers,” in Proc. 2008SIGCOMM, pp. 75–86.

Devendra D. Dahiphale received the B.E. degree inComputer Science and Engineering from the Univer-sity of Pune, India, in 2012. He also holds a Bach-elor of Arts Degree in English from YCMOU and aDiploma in Teacher Education from the Universityof Pune. He is currently working as a SoftwareDesign Engineer at Imagination Technologies. Heis also a member of the Dreamz Group which is agroup of software professionals who are passionateabout innovation and technology development.

Rutvik Karve currently works as a Senior Associatein the Technology and Data division at MorganStanley. Rutvik holds a Bachelor of EngineeringDegree in Computer Science and Engineering fromPune University, during the course of which heworked on the inception, design and developmentof ”Continuous Cloud MapReduce” as a member ofthe Dreamz Group. His other interests include Yog,Value Investing, Finance and Management.

Athanasios V. Vasilakos (M’00-SM’11) is currentlya Professor with the Kuwait University,Kuwait. Heserved or is serving as an Editor or/and GuestEditor for many technical journals, such as theIEEE TRANSACTIONS ON NETWORK AND SER-VICE MANAGEMENT, IEEE TRANSACTIONS ON

SYSTEM, MAN, AND CYBERNETICS-PART B: CY-BERNETICS, IEEE TRANSACTIONS ON INFOR-MATION TECHNOLOGY IN BIO-MEDICINE, IEEETRANSACTIONS ON COMPUTERS, ACM Transac-tions on Autonomous and Adaptive Systems, and the

IEEE JOURNAL ON SELECTED AREAS IN COMMUNICATIONS. He is alsoGeneral Chair of the Council of Computing of the European Alliances forInnovation.

Page 15: An Advanced MapReduce: Cloud MapReduce, Enhancements and Applications

DAHIPHALE et al.: AN ADVANCED MAPREDUCE: CLOUD MAPREDUCE, ENHANCEMENTS AND APPLICATIONS 115

Huan Liu is currently the CEO and co-founder ofJamo, a mobile fitness company that brings DanceFitness to smart phones. Prior to Jamo, Huan wasa Research Manager with Accenture TechnologyLabs, where he lead their research program in CloudComputing and Big Data. Huan holds a Ph.D. inElectrical Engineering from Stanford University.

Zhiwei Yu Zhiwei Yu, born in 1985. He is thePh.D in the Department of Computer Science andTechnology, Tsinghua University. He has attendedthe joint Ph.D program from 2011 to 2012, inthe department of computer science, the Univer-sity of Auckland, under the supervison of ClarkThomborson. His main research interests are CloudComputing and Software Protection.

Amit Chhajer has a Bachelor’s Degree from theUniversity of Pune with specialization in ComputerScience. He is currently working at a health-carestartup in Bangalore as Engineering Manger buildingSAAS software for doctors in India and Singapore.Amit’s final year thesis was under Dreamz AlumniGroup with concentration on Cloud Map Reduceand its real-life applications. His research interestsare in fields of data patterns and cloud computing.He is an active member at many sport programmingcompetitions like Codesprint and Codechef.

Jianmin Wang graduated from Peking University,China, in 1990, and got his M.E. and Ph.D. in Com-puter Software from Tsinghua University, China, in1992 and 1995, respectively. He is now a professorat the School of Software, Tsinghua University. Hisresearch interests include unstructured data manage-ment, workflow and BPM technology, Enterprise In-formation System, benchmark for database system.He has published over100 DBLP indexed papers inJournals, such as TKDE, TSC, DMKD, CII, DKE,FGCS, and IJIIS, and in conferences, such as VLDB,

SIGMOD, SIGIR, ICDE, AAAI,IJCAI, ICWS, andSAC. He has led to developa product data/lifecycle management system (PDM/PLM), which has beenimplemented in hundreds enterprises in china. Nowadays, he leads to developan unstructured data management system, LaUDMS.

Chaokun Wang received the B.Eng. degree incomputer science and technology, the M.Sc. degreein computational mathematics and the Ph.D. degreein computer software and theory in 1997, 2000 and2005, respectively, from Harbin Institute of Tech-nology, China. He joined the faculty of School ofSoftware at Tsinghua University in February 2006,where currently he is an Associate Professor. Hehas published over 60 refereed papers, got two bestpaper awards at international conferences and holdsten patents. His current research interests include

database management, music computing, and software protection.