heterodoop : a mapreduce programming system for ... · cpu-only program, expressing mapreduce...

12
HeteroDoop : A MapReduce Programming System for Accelerator Clusters Amit Sabne Purdue University [email protected] Putt Sakdhnagool Purdue University [email protected] Rudolf Eigenmann Purdue University [email protected] ABSTRACT The deluge of data has inspired big-data processing frame- works that span across large clusters. Frameworks for MapReduce, a state-of-the-art programming model, have primarily made use of the CPUs in distributed systems, leaving out computationally powerful accelerators such as GPUs. This paper presents HeteroDoop, a MapReduce framework that employs both CPUs and GPUs in a cluster. HeteroDoop offers the following novel features: (i) a small set of directives can be placed on an existing sequential, CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into a GPU code; (iii) a runtime system assists the compiler in handling MapReduce semantics on the GPU; and (iv) a tail scheduling scheme minimizes job execution time in light of disparate processing capabilities of CPUs and GPUs. This paper addresses several challenges that need to be overcome in order to support these features. Het- eroDoop is built on top of the state-of-the-art, CPU-only Hadoop MapReduce framework, inheriting its functionality. Evaluation results of HeteroDoop on recent hardware indi- cate that usage of even a single GPU per node can improve performance by up to 2.78x, with a geometric mean of 1.6x across our benchmarks, compared to a CPU-only Hadoop, running on a cluster with 20-core CPUs. Categories and Subject Descriptors D.1.3 [Software]: PROGRAMMING TECHNIQUES— Concurrent Programming, Distributed programming ; D.3.3 [Software]: PROGRAMMING LANGUAGES—Frame- works ; D.3.4 [Programming Languages]: Compilers Keywords Distributed Frameworks; MapReduce; Accelerators; Source- to-source Translation; Scheduling 1. INTRODUCTION A growing number of commercial and science applications in both classical and new fields process very large data vol- Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. Copyright 20XX ACM X-XXXXX-XX-X/XX/XX ...$15.00. umes. Dealing with such volumes requires processing in par- allel, often on systems that offer high compute power. For this type of parallel processing, the MapReduce para- digm has found popularity. The key insight of MapReduce is that many processing problems can be structured into one or a sequence of phases, where a first step (Map) operates in fully parallel mode on the input data; a second step (Re- duce) combines the resulting data in some manner, often by applying a form of reduction operation. MapReduce pro- gramming models allow the user to specify these map and reduce steps as distinct functions; the system then provides the workflow infrastructure, feeding input data to the map, reorganizing the map results, and then feeding them to the appropriate reduce functions, finally generating the output. The large data volumes involved may not fit on a single node. Thus, distributed architectures with many nodes may be needed. Among the systems that support MapReduce on distributed architectures, Hadoop [1] has gained wide use. Hadoop provides a framework that executes MapRe- duce problems in a distributed and replicated storage orga- nization (the Hadoop Distributed File System – HDFS). In doing so, it also deals with node failures. Big-data problems pose high demands on processing and IO speeds, often with emphasis on one of the two. For general compute-intensive problems, accelerators, such as NVIDIA GPUs and Intel Phis, have proven their value for an increasing range of applications. To obtain high per- formance, their architectures make different chip real-estate tradeoffs between processing cores and memory than in CPUs. A larger number of simpler cores provide higher aggregate processing power and reduce energy consump- tion, offering better performance/watt ratios than CPUs. In GPUs, intra-chip memory bandwidth is high and multi- threading reduces the effective memory access latency. These optimizations come at the cost of an intricate mem- ory hierarchy, reduced memory size, data accesses that are highly optimized for inter-thread contiguous (a.k.a. coa- lesced) reference patterns and explicitly parallel program- ming models. Using these architectures therefore requires high programmer expertise. While accelerators tend to perform well on compute-int- ensive applications, IO-intensive MapReduce problems may not always benefit. Previous research efforts on MapReduce- like systems employ either GPUs [2, 3, 4] alone, disregarding IO-intensive applications, or CPUs [1, 5, 6, 7] alone, los- ing out on GPU acceleration. In this paper we present our

Upload: others

Post on 20-May-2020

9 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

HeteroDoop : A MapReduce Programming System forAccelerator Clusters

Amit SabnePurdue University

[email protected]

Putt SakdhnagoolPurdue University

[email protected]

Rudolf EigenmannPurdue University

[email protected]

ABSTRACTThe deluge of data has inspired big-data processing frame-works that span across large clusters. Frameworks forMapReduce, a state-of-the-art programming model, haveprimarily made use of the CPUs in distributed systems,leaving out computationally powerful accelerators such asGPUs. This paper presents HeteroDoop, a MapReduceframework that employs both CPUs and GPUs in a cluster.HeteroDoop offers the following novel features: (i) a smallset of directives can be placed on an existing sequential,CPU-only program, expressing MapReduce semantics; (ii)an optimizing compiler translates the directive-augmentedprogram into a GPU code; (iii) a runtime system assists thecompiler in handling MapReduce semantics on the GPU;and (iv) a tail scheduling scheme minimizes job executiontime in light of disparate processing capabilities of CPUsand GPUs. This paper addresses several challenges thatneed to be overcome in order to support these features. Het-eroDoop is built on top of the state-of-the-art, CPU-onlyHadoop MapReduce framework, inheriting its functionality.Evaluation results of HeteroDoop on recent hardware indi-cate that usage of even a single GPU per node can improveperformance by up to 2.78x, with a geometric mean of 1.6xacross our benchmarks, compared to a CPU-only Hadoop,running on a cluster with 20-core CPUs.

Categories and Subject DescriptorsD.1.3 [Software]: PROGRAMMING TECHNIQUES—Concurrent Programming, Distributed programming ; D.3.3[Software]: PROGRAMMING LANGUAGES—Frame-works; D.3.4 [Programming Languages]: Compilers

KeywordsDistributed Frameworks; MapReduce; Accelerators; Source-to-source Translation; Scheduling

1. INTRODUCTIONA growing number of commercial and science applications

in both classical and new fields process very large data vol-

Permission to make digital or hard copies of all or part of this work for personal orclassroom use is granted without fee provided that copies are not made or distributed forprofit or commercial advantage and that copies bear this notice and the full citation onthe first page. Copyrights for components of this work owned by others than ACM mustbe honored. Abstracting with credit is permitted. To copy otherwise, or republish, topost on servers or to redistribute to lists, requires prior specific permission and/or a fee.Request permissions from [email protected] 20XX ACM X-XXXXX-XX-X/XX/XX ...$15.00.

umes. Dealing with such volumes requires processing in par-allel, often on systems that offer high compute power.

For this type of parallel processing, the MapReduce para-digm has found popularity. The key insight of MapReduceis that many processing problems can be structured into oneor a sequence of phases, where a first step (Map) operatesin fully parallel mode on the input data; a second step (Re-duce) combines the resulting data in some manner, often byapplying a form of reduction operation. MapReduce pro-gramming models allow the user to specify these map andreduce steps as distinct functions; the system then providesthe workflow infrastructure, feeding input data to the map,reorganizing the map results, and then feeding them to theappropriate reduce functions, finally generating the output.

The large data volumes involved may not fit on a singlenode. Thus, distributed architectures with many nodes maybe needed. Among the systems that support MapReduceon distributed architectures, Hadoop [1] has gained wideuse. Hadoop provides a framework that executes MapRe-duce problems in a distributed and replicated storage orga-nization (the Hadoop Distributed File System – HDFS). Indoing so, it also deals with node failures.

Big-data problems pose high demands on processing andIO speeds, often with emphasis on one of the two. Forgeneral compute-intensive problems, accelerators, such asNVIDIA GPUs and Intel Phis, have proven their value foran increasing range of applications. To obtain high per-formance, their architectures make different chip real-estatetradeoffs between processing cores and memory than inCPUs. A larger number of simpler cores provide higheraggregate processing power and reduce energy consump-tion, offering better performance/watt ratios than CPUs.In GPUs, intra-chip memory bandwidth is high and multi-threading reduces the effective memory access latency.These optimizations come at the cost of an intricate mem-ory hierarchy, reduced memory size, data accesses that arehighly optimized for inter-thread contiguous (a.k.a. coa-lesced) reference patterns and explicitly parallel program-ming models. Using these architectures therefore requireshigh programmer expertise.

While accelerators tend to perform well on compute-int-ensive applications, IO-intensive MapReduce problems maynot always benefit. Previous research efforts on MapReduce-like systems employ either GPUs [2, 3, 4] alone, disregardingIO-intensive applications, or CPUs [1, 5, 6, 7] alone, los-ing out on GPU acceleration. In this paper we present our

Page 2: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

HeteroDoop1 system, which exploits both CPUs and GPUsin a cluster, as needed by the application.

The first challenge in developing such a heterogeneousMapReduce system is the programming method. In a naivescheme, the programmer would have to write two programversions, one for CPUs and the second for GPUs. This needarises as accelerators rely on explicitly parallel programs, beit either low-level programming models such as CUDA [8]and OpenCL [9], or high-level ones, such as OpenACC [10]and OpenMP 4.0 [11]. Although available high-level pro-gramming models relieve the user from having to learnmodel-specific APIs, such as in CUDA or OpenCL, theystill require explicit parallel programming. On the otherhand, in CPU-oriented MapReduce systems, programmerswrite only sequential code; the underlying framework auto-matically employs all cores in the cluster by concurrentlyprocessing the input data, which is split into separate files.Previous research on GPU uses for MapReduce has eitherrelied on explicitly parallel codes with accelerator-specificoptimizations [3, 4, 12, 13], and/or on specific MapReduceAPIs [2, 3, 13, 14]. Programmability in both approaches ispoor; the former requires learning low-level APIs and thelatter necessitates application rewriting. To overcome theselimitations, our contribution enables programmers to portalready available sequential MapReduce programs to hetero-geneous systems by annotating the code with HeteroDoopdirectives. Inserting such directives is straightforward, re-quires no additional accelerator optimizations, and leads toa single input source code for both CPUs and GPUs. Fur-thermore, the resulting code is portable; it can still executeon CPU-only clusters.

The second key challenge in exploiting accelerators is theirlimited, non-virtual memory space. The default paralleliza-tion scheme used in MapReduce/Hadoop engages multiplecores by processing separate input files in parallel – typicallyone per core, as an individual task. Data input is appropri-ately partitioned into separate fileSplits, which are fed tothe different compute nodes and their threads. Simultane-ous accesses by many threads to their fileSplits require alarge memory. This size requirement is not a problem intoday’s typical CPUs with 4 to 48 cores and virtual mem-ory support; however, in GPUs with several hundred coresand possibly thousands of threads, the available, non-virtualmemory is insufficient. Our second contribution addressesthis challenge by processing the data records within a file-Split in parallel on the accelerator, while retaining the de-fault processing scheme on the CPU.

The third challenge is to translate the annotated, sequen-tial source code in a way that maximizes performance. Ourcontribution includes an optimizing source-to-source trans-lator, assisted by a runtime system, that generates CUDAcode and splits the work across CPUs and GPUs. In doingso, several issues arise. The first one is load imbalance acrossGPU threads, as the records processed in parallel may be ofdifferent size. A global work-stealing approach would incurhigh overheads, due to excessive atomic accesses by the GPUthreads. HeteroDoop overcomes this issue by using a novelrecord-stealing approach that partitions the records stat-ically across threadblocks but dynamically within thread-blocks. Another issue is that GPU memory is statically al-located but the size of the map phase output, the key-valuepairs, is not known a priori. Over-allocation would lead to

1Download from: http://bit.ly/1BwxGER

inefficient sorting of the key-value pairs, which follows themap phase. To resolve this issue, HeteroDoop includes a fastruntime compaction scheme, resulting in efficient sort. Fur-thermore, the runtime system executes the sort operationon the GPU rather than on the slower CPU. Other opti-mizations include efficient data placement in the complexGPU memory hierarchy and automatic generation of vectorload/store operations.

A final challenge in developing such a heterogeneousMapReduce scheme is to account for the different process-ing speeds of CPUs and GPUs for work partitioning. Whileprior work [15, 16] has dealt with load balancing acrossnodes, intra-node heterogeneity has remained an issue. Het-eroDoop’s tail scheduling scheme addresses this issue. Ourcontribution is based on a key observation: the load imbal-ance only arises in the execution of the final tasks in a job;careful GPU-speedup-based scheduling of the tailing taskscan avoid the imbalance.

We have evaluated the HeteroDoop framework on eightapplications, comprising well-known MapReduce programsas well as scientific applications. We demonstrate the utilityof HeteroDoop on a 48-node, single GPU cluster with largedatasets, and on a 64-node, 3-GPU cluster with in-memorydatasets. Our main results indicate that the use of evena single GPU per node can speed up the end-to-end jobexecution by up to 2.78x, with a geometric mean of 1.6x, ascompared to CPU-only Hadoop, running on a cluster with20-core CPUs. Furthermore, the execution time scales withthe number of GPUs used per node.

The remainder of the paper is organized as follows: Sec-tion 2 provides background on GPUs, MapReduce, and Had-oop. Section 3 describes the HeteroDoop constructs, fol-lowed by the compiler design in Section 4. Section 5 de-scribes the overall execution flow of the HeteroDoop frame-work and details the runtime system. The tail schedulingscheme is explained in Section 6. Section 7 presents the ex-perimental evaluation. Section 8 discusses related work, andSection 9 concludes the paper.

2. PRELIMINARIESWe introduce the basic terminology used in this paper

for the GPU/CUDA architecture and programming modelas well as the MapReduce and Hadoop concepts. We keepthe discussion of GPUs and CUDA brief, assuming readerfamiliarity, but we do refer to introductory material [17].

2.1 CUDA BasicsIn the CUDA [8] GPGPU architecture, the many GPU

cores are structured into multiple Streaming Multiproces-sors (SM). CUDA threads execute in SIMD fashion, wherea warp consisting of 32 threads executes a single instruc-tion and multiple warps time-share an SM in multi-threadingmode. Storage is separate from the CPU address space; it isstructured into the global (or device) memory, the per-SMshared memory (user managed cache), and specialized mem-ories. The latter include the read-only constant and texturememories as well as the registers.

CUDA programming is explicitly parallel with user-drivenoffloading; i.e, the user identifies compute-intensive code sec-tions, kernels, which are to be run on the GPU (the de-vice) and inserts data transfers between CPU and GPU. Thethreads are organized into threadblocks. All threads withina threadblock execute on the same SM. The programmer

Page 3: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

manages most of the storage hierarchy explicitly, includingfitting the data into the limited-size memories. There is novirtual memory support.

2.2 MapReduce and HadoopIn the MapReduce model, programmers write a map and

a reduce function, with the system organizing the overall ex-ecution workflow. The input data is placed on a distributedfile system, such as the HDFS [18] (Hadoop Distributed FileSystem). HDFS stores the input in blocks, or fileSplits. Ajob consists of a set of map and reduce tasks. In Hadoop,one node usually acts as master and the others as slaves.The master node runs a JobTracker, while each slave runs aTaskTracker – together they orchestrate the necessary mapand reduce tasks in a way that exploits data locality, engagesall available nodes and cores, and provides fault tolerance.The total number of concurrent tasks in a Hadoop cluster istypically the same as the number of available cores.

In Hadoop, each map task processes one fileSplit. Themap function applies a map operation to each data record inthis fileSplit. The map task emits a set of <key, value> pairs(or KV pairs). The framework puts these KV pairs intodifferent partitions, each partition targeted at a particularreduce task. To form these partitions, the KV pairs aresorted by their keys and split by a default or user-providedpartitioning function.

Each partition is then sent to its target reduce task. Thistask typically resides on a different node, making this acostly step. To reduce the cost, a task-local, user-providedreduction operation, known as the combiner, is applied firston each partition, minimizing the size of the communicateddata. This communication is also known as the shuffle phase.

In the next phase, the sort phase, each reduce task mergesthe incoming partitions into a sorted list. Then the reducephase applies the reduce function to these data. The outputis written back to the HDFS. As the data volumes involvedare typically very large, the intermediate results betweenmap and reduce are typically written to the local disk.

Hadoop uses a heartbeat mechanism for communicationbetween the JobTracker and TaskTracker. TaskTrackerssend heartbeats to the JobTracker at regular intervals.These heartbeats include items such as status of tasks andfree cores or slots. If the JobTracker finds that a TaskTrackerhas free slots, it schedules new tasks on the particular Task-Tracker in the heartbeat response.

Hadoop is written in Java, and hence the baseline systemsupports input functions written in Java. Hadoop Stream-ing [19] is an extension that supports other languages forwriting MapReduce programs. It is implemented so thatthe map, combine, and reduce functions obtain their inputfrom standard input and write the output onto standard out-put; map, combine, and reduce can be written as unix-style“filter” functions, using a language of choice.

3. HETERODOOP DIRECTIVESRecall that the HeteroDoop directives are designed to ex-

ploit both CPUs and GPUs in a cluster with a single sourceprogram. They identify the map and combine functions andtheir attributes in a serial, CPU-only MapReduce program.From this information, our translator generates code thatexploits both the CPUs and GPUs available in the nodes ofa distributed architecture. While the concept of HeteroDoopdirectives is language-independent, our HeteroDoop proto-

Listing 1: Wordcount Map Code withHeteroDoop Directives

1 int main() {2 char word[30], *line;3 size_t nbytes = 10000;4 int read, linePtr, offset, one;5 line = (char*) malloc(nbytes*sizeof(char));6 #pragma mapreduce mapper key(word) value(one) \\7 keylength(30) kvpairs(20)8 while( (read = getline(&line, &nbytes, stdin)) != -1) {9 linePtr = 0;

10 offset = 0;11 one = 1;12 while( (linePtr = getWord(line, offset, word,13 read, 30)) != -1) {14 printf("%s\t%d\n", word, one);15 offset += linePtr;16 }17 }18 free(line);19 return 0;20 }

type supports programs written in C. Our implementationmakes use of the Hadoop Streaming framework, which we ex-tend to enable efficient execution on both CPUs and GPUs.

3.1 HeteroDoop Directives with an ExampleA key insight used in the design of HeteroDoop constructs

is the observation that both map and combine functions it-erate over a non-predetermined number of records. The bulkof the computation of the map and combine functions is per-formed inside a while loop. HeteroDoop directives identifythese loops and express attributes that allow the translatorto generate efficient parallel code for the GPUs.

Listing 1 shows an example map code, written in C, forWordcount. This application counts the occurrences of eachword in a set of input files. The code reads each input lineand applies to it an elementary map operation. For eachword in the line, this map operation emits a KV pair <word, 1>. Notice that the input is read from STDIN usingthe getline function, while the output is written to STDOUTvia printf.

In general, the elementary map operation is applied to ev-ery record. By default a record is a line of input. The bulkof the map computation lies within the loop iterating overthese records – lines 8–17 in the example. The mapreducedirective on line 6 with the mapper clause tells the compilerthat this loop applies the map operation. The key and valueclauses identify the variables used for emitting KV pairs.The keylength and vallength clauses indicate the lengths ofthe respective variables; the clauses are needed if these vari-ables do not have a compiler-derivable type. Table 1 lists allHeteroDoop directives and clauses. Some of these clausesare optional; users need to provide them only for furtheroptimizations and tuning of the generated GPU code.

Listing 2 shows an example combine code for the sameWordcount application. Recall that, before the combiner isrun, the underlying HeteroDoop system sorts the KV pairsemitted by the map according to their keys and places theminto different partitions. The combiner function operates oneach partition and sums up the occurrences of each word.Note that the while loop in the map function can executein parallel, but the one in the combine function cannot, ex-cept for reduction-style parallelism. The only readily avail-able parallelism in the combine and reduce executions existsacross partitions. Typically, the number of partitions is nothigh enough to exploit the GPU completely. For this rea-

Page 4: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

Listing 2: Wordcount Combine Code withHeteroDoop Directives

1 int main() {2 char word[30], prevWord[30]; prevWord[0] = ’\0’;3 int count, val, read; count = 0;4 #pragma mapreduce combiner key(prevWord) value(count)5 keyin(word) valuein(val) keylength(30) vallength(1)6 firstprivate(prevWord, count) {7 while( (read = scanf("%s %d", word, &val)) == 2 ) {8 if(strcmp(word, prevWord) == 0 ) {9 count += val;

10 } else {11 if(prevWord[0] != ’\0’)12 printf("%s\t%d\n", prevWord, count);13 strcpy(prevWord, word);14 count = val;15 }16 }17 if(prevWord[0] != ’\0’)18 printf("%s\t%d\n", prevWord, count);19 }20 return 0;21 }

son, HeteroDoop provides no directives for reduce functionsand executes them on the CPUs only. For combine func-tions, however, as the data is already present in the GPUmemory, HeteroDoop employs an economical way for GPUexecution (Section 4.2).

Two extra clauses are necessary on the combinerfunction : keyin specifies the variable that receives the map-generated key and valuein does the same for the map-generated value.

3.2 Clauses for Memory andThread Attributes

HeteroDoop directives also allow for clauses that improveperformance by specifying the attributes of the data andthreads. The following constructs exist:

Read-only variables: The sharedRO clause lists read-only variables. The compiler places such variables in fasterGPU memories, such as the constant memory and the tex-ture memory. By default, our compiler places read-onlyscalars in the constant memory. Arrays whose sizes areknown at compile time are placed in the texture memory,otherwise in the global memory. The texture clause forcesplacement in the texture memory. Placing data in the tex-ture memory is especially useful for random array accesses,as this memory comes with a separate on-chip cache.

Firstprivate: This clause specifies variables that can beprivatized during the map or combine operation but are ini-tialized beforehand. In the absence of this clause, the com-piler tries to identify such variables automatically. It issuesa warning if the analysis is inaccurate, e.g., due to aliasing.

Notice that in the MapReduce programming model, allwritten variables are privatizable during the map and com-bine operations. There is no shared written data. The trans-lator performs the privatization without the need for userdirectives.

Space Allocation for KV Pairs: The space needed forthe KV pairs output by map is not known at translationtime. The translator allocates all free GPU memory forstoring these KV pairs. In practice, this leads to an over-allocation. To reduce the memory required for storing theseKV pairs, the users can indicate the maximum number ofKV pairs emitted by each record using the kvpairs clause.This reduction in the storage space required for the KV pairsimproves the aggregation efficiency for this storage, as willbe described in Section 4.3.

Thread Attributes: The blocks and threads clauses al-low programmers to choose the number of threadblocks andnumber of threads in a threadblock on the GPU, resp. Theseclauses help tune the map and combine kernel performance.

Table 1: HeteroDoop Directives

Clause Arguments Description Op-tional

mapper Specifies that the attached regionperforms map operation

No

combiner Specifies that the attached regionperforms combine operation

No

key Variablename

Specifies the variable that con-tains the key

No

value Variablename

Specifies the variable that con-tains the value

No

keyin Variablename

Specifies the variable that con-tains the incoming key. Valid onlyon the combiner

No

valuein Variablename

Specifies the variable that con-tains the incoming value. Validonly on the combiner

No

keylength Integer vari-able

Specifies the length of the key be-ing emitted

No

vallength Integer vari-able

Specifies the length of the valuebeing emitted

No

firstpri-vate

A set of vari-able names

These variables are initialized be-fore being used in the attached re-gion

No

share-dRO

A set of vari-able names

These are read-only inside themap or combine regions

Yes

texture A set of vari-able names

These variables are read-only andhence can be placed in GPU tex-ture memory

Yes

kvpairs Integer vari-able

This is an optional clause on mapregion specifying the maximumKV pairs that can be emittedfrom a single record

Yes

blocks Integer vari-able

Specifies the number of thread-blocks to use

Yes

threads Integer vari-able

Specifies the number of threads ina threadblock

Yes

4. COMPILERThis section describes the HeteroDoop source-to-source

translator, which converts an input MapReduce code anno-tated with HeteroDoop directives into a CUDA program. Itis built using the Cetus [20] compiler infrastructure. Trans-lating directly into CUDA, rather than a higher-level modelsuch as OpenACC or OpenMP 4.0, provides direct access toGPU-specific features which are exploited by the compileroptimizations. For the code running on the CPUs, the sameHadoop Streaming code is compiled using the gcc backendcompiler. In this manner, a single MapReduce source is suf-ficient for the execution on both CPUs and GPUs. The nexttwo subsections describe the key translation steps of generat-ing GPU kernel code for the map and the combine functions,respectively. Section 4.3 describes the code generation forthe host CPU, which offloads the kernels.

4.1 Map Kernel GenerationEach thread in the map kernel fetches a record, performs

the elementary map operation, and stores the generated KVpairs into its portion of a central GPU storage, called theglobal KV store. The process repeats until all records aredone.

The compiler begins by locating the annotation with themapper clause. The annotated while loop is the target regionfor kernel generation. The compiler extracts this region intoa new function, newGPUKernel, which contains the kernelcode. A key step in doing this is the transformation of the

Page 5: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

Algorithm 1 Handling Variables in Generated Kernels

Input: region - Region attached to the annotationInput: newGPUKernel - Extracted region copy (kernel)Input: sharedROSet - Set of shared read-only variables inside

newGPUKernelInput: textureSet - Set of shared read-only variables to be placed

on texture memoryInput: firstPrivateSet - Set of firstprivate variablesOutput: Correct placement of variables in newGPUKernel, along

with necessary GPU memory allocation and data transfergeneration

1: function handleVariables(region, newGPUKernel,sharedROSet, textureSet, firstPrivateSet)

2: usedV ars = newGPUKernel.getUsedVars()3: For each var ∈ usedV ars do4: if (sharedROSet contains var) then5: if (var is scalar) then6: newV ar = addParameter(var, newGPUKernel)

//gets placed on constant memory7: else //a shared read-only array8: newV ar = addParameter(var, newGPUKernel)9: insertMallocAndCpyIn(region, newV ar, var)10: end if11: else if (textureSet contains var) then12: tex = createNewTexture(var)13: newV ar = addParameter(var, newGPUKernel)14: bindTexture(tex, newV ar, region) //cudaBindTexture15: insertMallocAndCpyIn(region, newV ar, var)

//inserts cudaMalloc and cudaMemCpy16: else //a private variable17: newV ar = addPrivateVar(newGPUKernel, var)18: if (firstPrivateSet contains var) then19: newFPCopy = addParameter(var, newGPUKernel)20: if (var is array) then21: insertMallocAndCpyIn(region, newFPCopy, var)22: end if23: insertInKernelCopyCode(newFPCopy, newV ar);24: end if25: end if26: end for27: end function

different variable types. Algo. 1 shows the procedure. Fromthe user annotations, the compiler generates sharedROSet,which lists the shared read-only variables. TextureSet con-sists of read-only array variables that are to be placed in thetexture memory. FirstPrivateSet contains all variables thatare firstprivate, either specified by the programmer or iden-tified by the compiler. The scalar sharedRO variables arepassed as arguments to the kernel; the underlying CUDAcompiler places these arguments in constant memory (lines5–6). A pointer to each array sharedRO variable is passedthrough a kernel parameter (lines 8–9); storage is allocatedon the GPU and the array data is copied from the CPU intothis space. The transformation for using the texture memoryis essentially the same as for sharedRO arrays (lines 11–15).The functions addParameter and addPrivateVar create newvariables and automatically rename the previous variablenames in the newGPUKernel respectively.

The remaining variables inside the newGPUKernel areprivate (lines 17–24). For each private variable, a new vari-able is created inside the kernel body using the addPrivate-Var function. For firstprivate variables, there is an extrastep. For scalars, the initial value is passed through a kernelparameter. For arrays, storage is allocated and a referenceis passed via kernel parameter. The initial values of thearray elements are copied into the corresponding memorylocations before the kernel. Inside the kernel body, eachthread copies these values into the private spaces using theinsertInKernelCopyCode function.

Listing 3 shows the translated CUDA map kernel forWordcount (Listing 1). The kernel generation procedure

Listing 3: Translated Wordcount Mapper Code

1 __global__ void gpu_mapper(char *ip, int ipSize,2 int *recordLocator, char * devKey, int * devVal,3 int storesPerThread, int * devKvCount, int keyLength,4 int valLength, int * indexArray, int numReducers) {5 char gpu_word[30];6 int gpu_read, gpu_one, gpu_offset, gpu_linePtr;7 int index, tid, start;8 __shared__ unsigned int recordIndex;9 mapSetup(&start, &tid, &index, ipSize, storesPerThread,

10 ip, devKvCount, numReducers, &recordIndex);11 while( ( gpu_read = getRecord(ip, &recordIndex, &start,12 recordLocator) )!= - 1) {13 gpu_linePtr = 0;14 gpu_offset = 0;15 gpu_one = 1;16 while( (gpu_linePtr = getWord(ip + start, gpu_offset,17 gpu_word, gpu_read, 30)) != -1) {18 emitKV(gpu_word, &gpu_one, devKey, devVal, &index,19 devKvCount, keyLength, valLength, numReducers,20 storesPerThread, indexArray);21 gpu_offset += gpu_linePtr;22 }23 }24 mapFinish(index, storesPerThread, devKey, keyLength,25 indexArray, numReducers, devKvCount);26 }

adds internal parameters for bookkeeping. Ip represents theinput file buffer; ipSize is the input file size. RecordLocator isa data structure that keeps a list of starting addresses of allinput records. DevKey and devVal are the GPU variables forkeys and values of the global KV store, respectively. Stores-PerThread holds the storage size allocated to each thread inthe global KV store. DevKvCount array keeps a count ofthe KV pairs emitted by each thread.

The first GPU-specific translation step is the insertion ofa mapSetup function call (line 9) for the map execution.This function sets up internal variables for GPU executione.g. tid, which stores the thread ID. Next, the algorithm re-places the CPU record input function, such as getline, witha GPU equivalent getRecord function (line 11). Similarly,the KV-emitting function, which is printf in the CPU code,is replaced with a GPU equivalent emitKV function (line18). The emitKV function puts the generated KV pairs intothe global KV store. The translation scheme replaces thecalls to all C standard library functions with GPU coun-terparts. Since the current GPUs do not support all Cstandard library calls, their equivalent implementations areprovided by the runtime system. The runtime system alsoincludes other functions used in the translated code, such asmapSetup, getRecord, and emitKV. At the end of the mapkernel execution, the indexArray holds the locations of indi-vidual KV pairs in the global KV store. This array is usefulin indirectly accessing the global KV store.

The mapFinish function (line 25) performs bookkeepingafter a thread is done with the map execution. Most im-portantly, it keeps a count of the total number of KV pairsemitted by each thread.

Record Stealing: The execution time taken by the mapoperation for each record can vary greatly among differentrecords in certain MapReduce applications due to the dif-ferences in the amount of data in each record. For exam-ple, in the kmeans application, where each record contains alist of movie ratings, some records have fewer reviews thanothers. This leads to load imbalance among threads if therecords are statically partitioned. A better strategy is there-fore to perform dynamic record distribution, or record steal-ing. However, a global record distribution scheme wouldrequire global atomic functions on the GPU, which are ex-pensive. We therefore devise a scheme where the records are

Page 6: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

statically and equally split among the threadblocks used inthe map kernel, and threads of a given threadblock steal anew record from the threadblock’s pool. As it is common forlarger records to get distributed across threadblocks, recordstealing implemented at the threadblock level is effective.The variable recordIndex ( Listing 3, Line 9) acts as a counterfor the records used by the threads of a threadblock. Thisvariable is placed in the GPU shared memory for fast atomicincrement operations. The maximum record stealing that athread can perform is limited by the storesPerThread it hasin the global KV store.

Using Vector Data Types: For array keys/values,the generated code uses an optimization of internally us-ing CUDA-specific vector data types, such as char4, whichincrease the memory accessing performance. The functionsthat exploit such a vectorization include emitKV, and stringfunctions called within the map code, e.g. strcpy.

4.2 Combine Kernel GenerationAs the combine function operates on one partition at a

time, there is no explicit parallelism. However, differentpartitions can be processed in parallel. Unfortunately, wehave found that the number of partitions i.e., the numberof reducers can be low in certain MapReduce applications.Therefore, the degree of exploitable parallelism can be low,leading to underutilization of the GPU. Our scheme there-fore exploits in-partition, reduction-style parallelism, whilesacrificing full functional equivalence with respect to theCPU code. The idea for this approach is simple: e.g. forthe Wordcount code, a particular partition received the fol-lowing KV pairs <a,1>, <a,1>,<a, 1>, <b,1>. The outputfrom a CPU combiner would be <a,3>, <b,1>. However, iftwo different threads were to operate on the partition, withthe first operating on the first two KV pairs, and the secondon the last two, the intermediate output would be <a,2>,<a,1>, <b,1>. In this manner, the functional equivalenceof the combiner is traded off for parallelism. Due to the pres-ence of the global reducers, however, this trade-off is legal;the global reducer will eventually produce the same output.In practice, as the number of KV pairs in each partition istypically high, this small dissimilarity has a negligible im-pact on the communication volume.

A second design choice in the combine kernel generationdeals with the fact that the degree of exploitable in-partitionparallelism is still usually much less than the number of GPUthreads. The compiler-generated code forces all threads ina warp to execute the same combine function redundantly.This way, intra-warp thread divergence is eliminated. List-ing 4 shows the generated kernel for the Wordcount com-bine code of Listing 2. The compiler inserted a number ofparameters: Keys and values hold the KV pairs emitted bythe map for the particular reducer. OpKey and opVal storethe combine-emitted KV pairs. The lengths of keys andvalues for both map and combine functions are passed asparameters. The handleVariables pass (Algo. 1) adds twoparameters prevWordFP and countFP for firstprivate vari-ables.

Similar to the map setup function, the combineSetup func-tion (line 11) initializes the internal variables of the combineroperation. The compiler performs an optimization of plac-ing the private array variables in the faster shared memoryfor each warp. In this example, gpu prevWord and gpu wordare placed in the shared memory. The scalars are placed

Listing 4: Translated Wordcount Combiner Code

1 __global__ void gpu_combiner(char *keys, int *values,2 char *opKey, int *opVal, int *indexArray, int *finalCount,3 int size, int mapKeyLength, int mapValLength,4 int combKeyLength, int combValLength,5 char *prevWordFP, char *countFP) {6 int laneID, kvsPerThread, warpID, ptr, gpu_val;7 int high, kvCount, index, gpu_read, gpu_count;8 //WARPS_IN_TB = number of warps in a threadblock9 __shared__ char gpu_prevWord[WARPS_IN_TB][30];

10 __shared__ char gpu_word[WARPS_IN_TB][30];11 combineSetup(kvsPerThread, &laneID, &warpID, &ptr,12 &high, &kvCount, &index, size);13 for(int i=0; i < 30; i++) { //init firstprivate data14 gpu_prevWord[warpID][i] = prevWordFP[i];15 }16 gpu_count = countFP;17 while(getKV(gpu_word, keys, &gpu_val, values, ptr,18 high, indexArray, mapKeyLength, mapValLength)!= -1) {19 if(strcmpGPU(gpu_word, gpu_prevWord[warpID],20 mapKeyLength)==0){21 gpu_count += gpu_val;22 } else {23 if(gpu_prevWord[0] != ’\0’) {24 storeKV(gpu_prevWord[warpID], &gpu_count, &index,25 combKeyLength, combValLength, opKey, opVal,26 &kvCount);27 }28 strcpyGPU(gpu_prevWord[warpID], gpu_word,29 mapKeyLength);30 gpu_count=gpu_val;31 }32 }33 if (gpu_prevWord[0] != ’\0’) {34 storeKV(gpu_prevWord[warpID], &gpu_count, &index,35 combKeyLength, combValLength, opKey, opVal, &kvCount);36 }37 finalCount[warpID]=kvCount;38 }

in the thread registers. The compiler replaces calls to scanfthat read in the KV pairs with a GPU-specific getKV call(line 18). The keyin and valuein clauses are necessary forthis function. The original printf function for outputtingthe KV pairs is replaced by the storeKV function (lines 24,34). FinalCount keeps track of the KV pairs emitted by eachthread so that the final output can be combined and writtenback to the CPU.

While each thread in the warp executes the combiner func-tion redundantly, for getKV and storeKV the threads per-form vectorized loading and storing of KV pairs, respec-tively. This method loads/stores one element in an arraykey/value from one thread. It improves performance as itenables coalesced memory accesses. This same optimizationis also performed on string functions, such as strcpy in thecombiner kernels. Note that in order to dynamically switchbetween the vector and non-vector modes, all threads in thewarp must be active for the entire code execution, makingredundant execution in non-vectorizable code sections nec-essary. This redundant execution comes without any side-effects, owing to the warp-level SIMD model. If neither thekey nor the value is an array, the compiler would generate acode where only a single thread per warp is active.

4.3 Host Code for OffloadingSince the GPU code execution is orchestrated by the host,

the compiler must generate the necessary host code. Fig. 1displays a flowchart for the structure of the generated code.First, the code copies the input fileSplit from HDFS into theGPU memory. A GPU kernel is then launched to collect andcount the records in the input. Next, necessary storage isallocated on the GPU for map and combine kernels. To allo-cate the global KV store, all available GPU memory is usedin the absence of the kvpairs clause. Otherwise, the globalKV store memory allocation can be reduced, because the to-

Page 7: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

Figure 1: Flowchart for the driver code on the hostCPU : Dark boxes indicate functions provided bythe runtime system

tal number of records is already known. Each GPU threadin the map kernel generates KV pairs in its own portion ofthe global KV store. Note that each thread may not com-pletely use its own portion of the global KV store, leadingto whitespaces, which are the empty slots in the global KVstore. This leads to a scattering of the KV pairs belong-ing to each partition. These KV pairs must be aggregatedby removing the whitespaces before they are sorted in thenext phase, reducing the sort size. The indirection array(indexArray) is useful for performing this aggregation as theKV pairs do not need to be shuffled directly. Note thatsmaller global KV store size results in better aggregation ef-ficiency. Next, sort, followed by the combine kernel, are runon each partition. The generated output is written back to alocal disk. For map-only jobs, the output is written directlyto the HDFS. Finally, the allocated memories are freed.

5. OVERALL EXECUTION ANDRUNTIME SYSTEM

This section describes the overall flow of the HeteroDoopframework and the runtime system. The runtime systemassists the compiler-optimized code to run on a GPU byproviding a number of library functions (Section 5.2), andhelps implement the MapReduce semantics (Section 5.3).

5.1 Overall ExecutionFig. 2 shows the overall workflow in HeteroDoop and the

roles of the runtime system. First, the user-written mapand combine functions are translated by the HeteroDoopsource-to-source compiler. Next, the generated CUDA codeis compiled by the nvcc compiler for the GPU executable.The original source is compiled by gcc for the CPU ex-ecutable. The executables are inserted into the HadoopStreaming mechanism. When the execution starts, the Job-Tracker sends tasks to TaskTrackers, which schedule themeither on the CPU cores, using native Hadoop Streaming, oron a GPU. The latter is coordinated by the GPU driver ofthe runtime system. The GPU driver fetches new tasks andinvokes the GPU executables to perform map, combine andother MapReduce semantic-specific operations. Upon com-pletion of a task, the GPU driver informs the TaskTracker

Figure 2: Execution Scheme of HeteroDoop :Dashed arrows represent the execution flow; solidarrows represent the compilation flow

about the task completion details, which comprise executiontime, task log, etc.

Hadoop Integration and Fault Tolerance: Fault tol-erance of HeteroDoop for GPU tasks manifests itself in twoways: i) a task failure is communicated to the Hadoop sched-uler so that it can reschedule the task; ii) the failed GPUis revived so that future tasks can still be issued to it. Toobtain this fault tolerance and to achieve issuing of tasksto GPUs, we have modified Hadoop’s MapTask class im-plementation to signal a new task to be run on the GPUto the GPU driver. TaskTrackers on each slave keep oneslot reserved per GPU. Note that these slots simply offloadthe tasks on GPUs; no CPU time is consumed. Schedul-ing decisions on the GPU are described in the next section.Internally, the driver runs one thread for each GPU on thenode. The driver assures that only a single task runs onthe GPU at a time. If a thread’s execution fails, the er-ror is communicated to Hadoop TaskTracker, and the driverrestarts the thread. In this manner, the GPU driver is madefault tolerant.

5.2 Assisting GPU ExecutionThe following runtime library functions help facilitate the

GPU kernel execution.File Handling: HDFS is not POSIX API compliant

for file handling; directly reading the input fileSplit froma C-code is not supported. The runtime system uses lib-HDFS [21], which provides a C/C++ function for fetchingthe input fileSplits from HDFS. The output of the map +combine execution is written to the local disk in a Hadoop-compatible binary format (SequenceFileFormat). For map-only jobs, the output is written directly to the HDFS.

Record Handling: The records in an input file mustbe pre-determined to support record stealing. The runtimesystem implements a GPU kernel that pre-determines andcounts the records in the input file. This kernel is exe-cuted before the map kernel. The runtime system provides athread-safe function, getRecord, which can be called by eachthread of the map kernel to fetch a new record.

5.3 Implementing MapReduce SemanticsPerforming Partition Aggregation: After the map

execution, KV pairs in each partition of the global KV store

Page 8: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

Figure 3: Key Idea of Tail Scheduling

must be compacted to get rid of the whitespaces, resulting inimproved sorting efficiency. The runtime system provides anaggregation function that operates in parallel. It utilizes thecount of the KV pairs emitted by each thread of the mapkernel and an indirection array to locate KV pairs in theglobal KV store. A fast, parallel scan method for GPUs [22]is used to compute the prefix sum of the emitted KV pairsfor each thread. From this information and the count of KVpairs emitted by each thread, a GPU kernel converts the oldindirection array into a new one, wherein the generated KVpairs are aggregated.

Intermediate Sort: Recall that the MapReduce modelsorts the KV pairs after the map operation. HeteroDoopuses a modified version of the efficient GPU merge sort im-plementation [23]. The original implementation operates onfixed, short-length integer keys and values, making efficientuse of the GPU shared memory. For long keys and values,this method would make inefficient use of the shared memoryand thus restrict the partial merge size. Our implementa-tion therefore forgoes direct key-based sorting. We insteadmodify the original method [23] to employ indirection foraccessing the KV pairs. The use of indirection avoids ex-pensive movements of the KV pairs in the device memory.We chose this implementation over an alternative merge sortapproach [24] for variable-length keys. This alternative ap-proach hashes the first few characters of the variable-lengthkeys, breaking ties by comparing the next characters. Wechose not to use this method, since hashing requires addi-tional memory on the GPU.

6. TAIL SCHEDULINGHadoop’s default scheduler does not take into account

the disparity in the computational capabilities of the pro-cessors. When employing both CPUs and GPUs, task ex-ecution times vary substantially, requiring more advancedscheduling decisions. This section describes the HeteroDoopscheduler, aiming at optimal execution on accelerator clus-ters where each node has a CPU and one or more GPUs.

6.1 GPU-First Execution vs Tail SchedulingA simplistic scheduler for CPUs and GPUs would act as

follows. Whenever a new task is issued on a node, the taskis scheduled on a GPU if such a device is free; otherwise, theCPU is chosen. We refer to this method as “GPU-first.”

GPU-first scheduling keeps all CPU and GPU cores busyuntil the final tasks show up. Consider the example scenarioin Fig. 3, which uses 1 GPU and a 2-core CPU for scheduling

a total of 19 tasks. The GPU slot is 6x faster than the CPUslot. The figure shows the sequence number of each task. InGPU-first scheduling, shown on the left, the 1st task wouldgo on the GPU and the 2nd and 3rd on the CPU. Sincethe GPU finishes early, the 4th task would go again on theGPU. Continuing in this manner, the 17th task goes on theGPU, while the 18th and 19th stay on the CPU. Since theCPU tasks are much slower, the faster GPU remains idle atthe end stage, which is sub-optimal. A smarter scheme, asshown on the right side of Fig. 3, would force tasks 18 and19 to execute on the GPU, saving on overall execution time.This is the key idea of tail scheduling.

A key challenge in realizing tail scheduling in HeteroDoopis for the slaves to know when their tails begin. As this infor-mation is not available to the slaves in the baseline Hadoop,in our algorithm, the JobTracker will estimate the numberof remaining tasks for each node and communicate this in-formation to the TaskTrackers, as described next.

Algorithm 2 Tail Scheduling on JobTracker and Task-Tracker

1: function TailScheduleOnJT(numGPUs, maxSpeedup,numSlaves) //this function executes on the JobTrackerbefore sending heartbeat response

2: jobTail = numGPUs × maxSpeedup × numSlaves3: if (jobTail < getNumRemainingMaps()) then

//identifying tail4: taskSet = scheduleNumGPUTasksAtMax() //assures

fairness5: else6: taskSet = useHadoopDefaultScheduling();7: end if8: numMapsRemainingPerNode =

getNumRemainingMaps()/numSlaves9: heartBeatResponse.add(numMapsRemainingPerNode,

taskSet)10: end function

//TaskTracker calculates GPU speedup over CPU for eachtask

11: function TailScheduleOnTT(task, numGPUs, aveSpeedup,numMapsRemainingPerNode)

12: taskTail = numGPUs × aveSpeedup13: if (taskTail <= numMapsRemainingPerNode) then14: forceGPUexecution(task) //tail is forced on GPU15: else16: useGPUFirst(task);17: end if18: end function

6.2 Tail Scheduling AlgorithmWe implement tail scheduling in HeteroDoop at two lev-

els: on the JobTracker and on the TaskTracker. Algorithm 2describes the mechanism. The TaskTracker continually cal-culates the average GPU slot speedup over the CPU slot andinforms the JobTracker. The Hadoop heartbeat mechanismhas been modified to carry this information. The JobTrackerremembers the maximum speedup from the TaskTrackers.For the TaskTracker, the tail size (taskTail) is the numberof GPU tasks that execute in the same amount of time as oneCPU task. It is computed as the number of GPUs on thecurrent node (numGPUs) multiplied by the average GPUspeedup (aveSpeedup) seen on that TaskTracker.

The JobTracker notifies each TaskTracker about the re-maining number of map tasks to be scheduled (numMaps-RemainingPerNode). As task scheduling on the JobTrackeris on a first-come-first-serve basis, it does not exactly knowhow many tasks would go on a given TaskTracker. The Job-Tracker estimates numMapsRemainingPerNode as the to-tal number of remaining tasks divided by the number of

Page 9: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

Table 2: Description of the Benchmarks UsedBenchmark %Exec. Time

Map + Com-bine are Active

Nature(Intensive-ness)

Combiner Total Reduce Tasks Total Map tasks Input Size (GB)

Cluster1 Cluster2 Cluster1 Cluster2 Cluster1 Cluster2Grep (GR) 69 IO Yes 16 16 7632 2880 902 340Histmovies (HS) 91 IO Yes 8 8 4800 640 1190 159Wordcount (WC) 91 IO Yes 48 32 5760 1024 844 151Histratings (HR) 92 Compute Yes 5 5 4800 2560 591 160Linear Regression (LR) 86 Compute Yes 16 16 2560 3840 714 356Kmeans (KM) 89 Compute No 16 16 4800 NA 923 NAClassification (CL) 92 Compute No 16 16 4800 3200 923 72BlackScholes (BS) 100 Compute No 0 0 3600 5120 890 210

slaves (line 8). Whenever taskTail is greater than numMaps-RemainingPerNode, GPU-first scheduling is used (lines 13–17). Otherwise, all the next tasks - across all slots of theTaskTracker - are forced for GPU execution. As all slots ona TaskTracker force their tasks on the GPU(s) once the task-Tail begins, queuing might occur on the GPU(s). To counterthis effect, the JobTracker only schedules at most numGPUstasks on a TaskTracker per heartbeat once the jobTail be-gins. JobTail is the total number of tasks all GPUs in thecluster can finish in the same amount of time as a single CPUcore. It is estimated as numGPUs × maxSpeedup × num-Slaves, where numSlaves is the total number of slave nodes inthe cluster and maxSpeedup is the maximum GPU speedupseen across the cluster. ScheduleNumGPUTasksAtMax re-places the original Hadoop scheduling scheme, which sched-ules tasks up to the number of empty GPU and CPU slotsper heartbeat (lines 3–7).

7. EVALUATIONThis section evaluates the HeteroDoop system. We

present the overall performance achieved on our heteroge-neous CPU-plus-GPU system versus a CPU-only scheme,i.e. baseline Hadoop. To analyze the performance in fur-ther detail, we show individual task speedups of GPU overCPU execution. We also present the performance benefitsof HeteroDoop compiler optimizations on individual kernels.

Table 3: Cluster Setups UsedCluster1 Cluster2

#nodes 48 (+1 master) 32 (+1 master)CPU Intel Xeon E5-

2680Intel Xeon X5560

#CPU cores 20 12GPU(s) Tesla K40 (Ke-

pler)3×Tesla M2090(Fermi)

RAM 256GB 24GBDisk 500GB noneCommunication FDR InfiniBand QDR InfiniBandHadoop Version Hadoop 1.2.1 Hadoop 1.2.1CUDA Version CUDA 6.0 CUDA 5.5HDFS Block Size 256MB 256MBHDFS Replication Factor 3 1Max. Map Slots Per Node 20 (+1 for GPU

runs)4 (+1/GPU forGPU runs)

Max. Reduce Slots PerNode

2 2

Speculative Execution Off Off% map tasks to finishbefore reduce starts

20 20

7.1 BenchmarksWe used eight benchmarks for our experiments. Grep,

wordcount, kmeans, classification, histmovies and histratingsare taken from the PUMA benchmark suite [25]. They rep-resent typical MapReduce applications, including both IO-intensive and compute-intensive ones. Apart from these, we

evaluate two scientific computation applications, blackSc-holes and linear regression, that have been shown to beamenable to MapReduce [12, 3]. Grep and wordCount arewell-known MapReduce applications, and both are IO inten-sive. Kmeans is an iterative clustering application. Whilekmeans is a centroid-based clustering algorithm, other vari-ants such as distribution-based or density-based clusteringare popular as well. All these clustering algorithms areknown to be compute intensive. Classification benchmark isderived from statistical classification algorithms. It is simi-lar to kmeans; however, there is no clustering involved. Theapplication ends after classifying the input dataset to respec-tive centroids in a single iteration. Histograms are commonin big-data applications. Histmovies and histratings are twosuch applications. Histmovies averages the review ratingsfor each movie in a given dataset and puts these averagesinto bins. Histratings directly bins each review rating forall movies in the dataset. Since the combiner receives largerdata to operate on, histratings becomes more compute in-tensive than histmovies. BlackScholes is a financial pricingmodel, with explicit parallelism. Other financial applica-tions with similar workloads include binomial options andSOBOL quasi random number generator. Linear regressionis commonly used in curve-fitting applications. For blackSc-holes, we ran 128 iterations per option. For linear regression,each input file contained data for 12 regressors, with 32 rowseach. The details of the benchmarks, data sizes and tasksare presented in table 2.

7.2 Cluster SetupsTable 3 shows the configurations of the two XSEDE [26]

clusters used in this evaluation. Cluster1 is our primaryplatform. Each node has one GPU, which is a newer gener-ation device. To evaluate the scalability of HeteroDoop ona multi-GPU system, we use Cluster2, which includes threeolder-type GPUs per node. Cluster2 is an in-memory sys-tem, i.e., there are no disks. Input, output, and temporarystorage exist in the main memory of the Cluster2 nodes.

7.3 Overall ImprovementsFig. 4 shows the overall performance of HeteroDoop over

the CPU-only Hadoop baseline. We ran each experimentthree times, and report the best run. The variation acrossruns was low (<5%). The benchmarks are arranged by in-creasing speedup. As expected, HeteroDoop greatly outper-forms CPU-only Hadoop on compute-intensive benchmarksowing to the GPU acceleration. Compute-intensive applica-tions are therefore ideal candidates for HeteroDoop. For IO-intensive benchmarks, the use of CPUs brings about mostof the achievable performance, indicating that execution onCPUs is essential in IO-intensive applications. In thesebenchmarks, HeteroDoop speedups are higher for Cluster2

Page 10: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

(a) Performance gains of HeteroDoop normalized toCPU-only Hadoop on Cluster1

(b) Performance gains of HeteroDoop normalized toCPU-only Hadoop on Cluster2

Figure 4: Performance evaluation of HeteroDoop

than for Cluster1 because i) Cluster1 uses more CPU coresthan Cluster2, and ii) Cluster2 is an in-memory system,reducing the IO-intensiveness of the applications. Fig. 4bshows multi-GPU scalability results of HeteroDoop. KM isnot shown, as the memory requirement exceeds the capac-ity of Cluster2. The figures also show the effectiveness oftail scheduling over GPU-first scheduling. Note that tailscheduling improves performance only if the GPU(s) go idleat the end of the execution. For LR on Cluster1, this im-balance does not arise. For the IO-intensive benchmarks,the GPU speedup over a CPU core is low, resulting in onlymarginal benefits of HeteroDoop and tail scheduling.

7.4 Detailed AnalysisWe present detailed analyses of the performance obtained

on our primary cluster, Cluster1. Cluster2 showed similartrends. Fig. 5 shows the speedups obtained by a GPU taskover a CPU task run by a single core, with the compiler-translated baseline code and with the optimizations. Theoptimizations include vectorization of map and combine ker-nels, using texture memory, record stealing and performingKV pair aggregation prior to sort. The GPU task com-prises input copy, record counting, map, aggregate, sort,combine, and output write operations. The benchmarksare sorted by increasing speedups of their tasks. We at-tribute the increasing speedup to higher compute intensive-ness. Each bar in the figure shows the speedup achieved bythe baseline-translated code and the additional benefit ofthe optimizations. In GR, KM, CL, and LR, the optimiza-tions gain substantial additional performance. Note thateven for IO-intensive applications, such as GR and HS, theGPU achieves speedups over a single CPU core, and there-fore, executing MapReduce tasks on GPUs along with CPUcores is still beneficial in these applications.

Fig. 6 breaks down the execution time for a single GPUtask on each benchmark. Input reading time is the timefor reading the HDFS file. Output writing time measuresthe time required for formatting the generated GPU out-put in Hadoop binary format, calculating the checksum,and writing the data on the HDFS/local disk, dependingon whether or not the application is map-only. Fig. 6 showsthat different stages of the MapReduce scheme can form abottleneck on the GPU for different benchmarks. Note thatthe partition aggregation times are negligible in all bench-marks. Both these experiments (fig. 5, Fig. 6) made use ofinput data-local map + combine tasks. These figures indi-cate that a single-task speedup can be as high as 47x for

Figure 5: Speedup of a single GPU task over aCPU task : Optimizations have high impact on fourbenchmarks

Figure 6: Execution time breakdown of a GPU task

BS, which is our most compute-intensive application. TheGPU task of BS spends 62% of its execution time writingthe output, which is up from 1% in a CPU task. Evidently,the high computational power of GPUs moves the bottle-neck from computation to disk write. CL and KM are bothmap-heavy operations. Wordcount shows an interesting casewhere most of the execution time is spent in sorting since itemits many long-length keys. HR and LR spend substantialexecution time in the combine operation.

Fig. 7 shows the effects of individual optimizations. Thefirst is the usage of texture memory, which can speed up themap kernel in KM and CL benchmarks by 2x (Fig. 7a). Vec-torized read and write of the KV pairs in the combine kernelcan improve this operation by 2.7x, as shown in Fig. 7b. Vec-torized read and write in the map kernel can yield gains ofup to 1.7x (Fig. 7c). A speedup of up to 1.36x is gainedby the record stealing scheme on map kernels as shown inFig. 7d. Aggregating KV pairs in each partition prior to

Page 11: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

(a) Effects of using TextureMemory

(b) Effects of Vectorized Read-Write oncombine kernels

(c) Effects of Vectorized Read-Write on map kernels

(d) Effects of Record Stealing on map kernels (e) Effects of KV Pair Aggregation on sort kernels

Figure 7: Effects of Optimizations : Only the benchmarks affected by the optimizations are shown

sorting can improve the sort kernel’s performance by up to7.6x (Fig. 7e).

8. RELATED WORKWe classify related work into the following five categories:Single Node Systems: MapReduce has been studied

as a programming model for several architectures. Two ap-proaches [27, 28] target multi-cores. Catanzero et. al. [4]proposed a MapReduce system for GPUs. Mars [2] usesMapReduce as a programming model for a single GPU node.MapCG [14] offers a MapRedue API that is portable acrossCPUs and GPUs. But all these systems lack a distributedframework, and do not scale across a cluster.

Distributed Systems: GPMR [3] uses CUDA + MPIas a platform for writing applications, but does not employCPUs in a cluster. As both CUDA and MPI require signif-icant programmer expertise to organize the parallelism andmanage communication, programmability of GPMR is low.MITHRA [12] is a system that shows the effectiveness ofusing Hadoop and GPUs together in two scientific applica-tions. Unlike HeteroDoop, MITHRA relies on the program-mer to write CUDA code for the application. It uses HadoopStreaming as well, and utilizes only the GPUs in a cluster.Glasswing [13] uses either CPUs or GPUs for MapReduce.It requires the input map and reduce kernels to be writtenwith OpenCL and offers a unified API to feed these kernelsto the framework.

Employing both CPUs and GPUs: HadoopCL [29]presents a Java-based approach, where the programmerwrites map and reduce functions in GPU-friendly classes.Java bytecode is translated into an OpenCL program. Akey difference between HadoopCL and HeteroDoop is theparallelism exploitation: for a given fileSplit of a task run-ning either on the CPU or GPU, HadoopCL launches manysmall, asynchronous OpenCL kernels. This leads to IO over-heads, as the input data chunk of the fileSplit for each kernelmust be copied separately into its buffer, and output must bewritten into a buffer which is shared across kernels, requir-ing serialization. Further, for multi-core CPUs, this strat-egy results in one task occupying all the CPU cores, and

would require significant changes in the Hadoop schedulerwhen jobs have to share a cluster. Architecture-specific op-timizations and CPU-GPU work partitioning decisions arenot considered.

Scheduling Hadoop Tasks on a CPU-GPU Cluster:Shirahata et. al. [30] present a strategy where map taskscan be run on both CPUs and GPUs. The programmer isexpected to provide the corresponding CPU/CUDA codes.This work employs a mathematical performance model for aheterogeneous scheduling strategy that minimizes the over-all execution time for the given constraints. However, suchminimization requires evaluation of the performance modelthroughout the job execution. By contrast, tail schedulingaffects only the final tasks, reducing the scheduling overhead.

GPU-Specific MapReduce Optimizations: Chen et.al. present a system [31] for running MapReduce on CPU-GPU coupled architectures, where the CPU and GPU sharethe system memory. Another approach [32] presents a sys-tem that makes better use of the on-chip shared memory ofthe GPU for running reduction-intensive applications. Boththese systems present important optimizations for MapRe-duce as a programming model on a single node. However,they are not directly applicable to distributed MapReducesince the map and reduce stages may run on different nodes.Also, GPU-shared-memory based optimizations for storingintermediate KV pairs would be precluded.

9. CONCLUSIONThis paper has presented HeteroDoop, a MapReduce

framework and system that employs both CPUs and GPUsin a cluster. It has introduced HeteroDoop directives, thatcan be placed on an existing sequential, CPU-only, MapRe-duce program for efficient execution on both the CPU andone or more GPUs on each node. The optimizing Het-eroDoop compiler translates the directive-augmented pro-grams into efficient GPU codes. HeteroDoop’s runtime sys-tem assists in carrying out the compiler optimizations andhandling MapReduce semantics on the GPUs. HeteroDoopis built on top of Hadoop, the state-of-the-art MapReduceframework for CPUs. The HeteroDoop runtime system plays

Page 12: HeteroDoop : A MapReduce Programming System for ... · CPU-only program, expressing MapReduce semantics; (ii) an optimizing compiler translates the directive-augmented program into

a vital role in this integration. A novel scheduling schemeoptimizes the execution times of jobs on CPU-GPU hetero-geneous clusters. Our experiments with HeteroDoop in eightbenchmarks demonstrate that using a single GPU per nodecan achieve speedups of up to 2.78x, with a geometric meanof 1.6x, compared to a cluster running CPU-only Hadoop on20-core CPUs. The proposed tail scheduling scheme workswell for intra-node heterogeneity. We leave handling of ex-treme inter-node heterogeneity to future work, where thetrade-off between data locality and execution speed will bean important additional consideration.

AcknowledgementsWe thank T. N. Vijaykumar and anonymous reviewers fortheir valuable feedback that helped improve this paper. Thiswork was supported, in part, by the National Science Foun-dation under grants no. 0916817-CCF and 1449258-ACI.This work used the Extreme Science and Engineering Dis-covery Environment (XSEDE), which is supported by Na-tional Science Foundation grant no. ACI-1053575. We alsothank NVIDIA corporation for the equipment grant.

10. REFERENCES[1] Apache, “Hadoop.” [Online]. Available:

http://hadoop.apache.org/. (accessed December 30, 2014).

[2] B. He, W. Fang, Q. Luo, N. K. Govindaraju, and T. Wang,““Mars: a MapReduce framework on graphics processors”,” inProceedings of the 17th international conference on Parallelarchitectures and compilation techniques, PACT ’08,pp. 260–269, ACM, 2008.

[3] J. Stuart and J. Owens, ““Multi-GPU MapReduce on GPUClusters”,” in Parallel Distributed Processing Symposium(IPDPS), 2011 IEEE International, pp. 1068 –1079, may 2011.

[4] B. Catanzaro, N. Sundaram, and K. Keutzer, “A map reduceframework for programming graphics processors,” in InWorkshop on Software Tools for MultiCore Systems, 2008.

[5] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, andI. Stoica, “Spark: Cluster computing with working sets,” inProceedings of the 2Nd USENIX Conference on Hot Topics inCloud Computing, HotCloud’10, (Berkeley, CA, USA),pp. 10–10, USENIX Association, 2010.

[6] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly, “Dryad:Distributed data-parallel programs from sequential buildingblocks,” in Proceedings of the 2Nd ACM SIGOPS/EuroSysEuropean Conference on Computer Systems 2007, EuroSys’07, (New York, NY, USA), pp. 59–72, ACM, 2007.

[7] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, P. K.Gunda, and J. Currey, “Dryadlinq: A system forgeneral-purpose distributed data-parallel computing using ahigh-level language,” in Proceedings of the 8th USENIXConference on Operating Systems Design andImplementation, OSDI’08, (Berkeley, CA, USA), pp. 1–14,USENIX Association, 2008.

[8] NVIDIA, “CUDA.” [Online]. Available:http://bit.ly/1xnvPXr, 2015. (accessed January 5, 2015).

[9] OpenCL, “OpenCL.” [Online]. Available:http://www.khronos.org/opencl/. (accessed Jan. 4, 2015).

[10] OpenACC, “OpenACC: Directives for Accelerators.” [Online].Available: http://bit.ly/1Bx3pWk. (accessed June. 11, 2014).

[11] OpenMP, “OpenMP: Version 4.0.” [Online]. Available:http://bit.ly/1Bx3IjT, 2013. (accessed July 31, 2014).

[12] R. Farivar, A. Verma, E. Chan, and R. H. Campbell, “Mithra:Multiple data independent tasks on a heterogeneous resourcearchitecture.,” in CLUSTER, pp. 1–10, IEEE, 2009.

[13] I. El-Helw, R. Hofman, and H. E. Bal, “Scaling mapreducevertically and horizontally,” in Proceedings of the InternationalConference for High Performance Computing, Networking,Storage and Analysis, SC ’14, (Piscataway, NJ, USA),pp. 525–535, IEEE Press, 2014.

[14] C. Hong, D. Chen, W. Chen, W. Zheng, and H. Lin, “Mapcg:Writing parallel program portable between cpu and gpu,” inProceedings of the 19th International Conference on ParallelArchitectures and Compilation Techniques, PACT ’10, (NewYork, NY, USA), pp. 217–226, ACM, 2010.

[15] M. Zaharia, A. Konwinski, A. D. Joseph, R. Katz, andI. Stoica, “Improving mapreduce performance in heterogeneousenvironments,” in Proceedings of the 8th USENIX Conferenceon Operating Systems Design and Implementation, OSDI’08,(Berkeley, CA, USA), pp. 29–42, USENIX Association, 2008.

[16] F. Ahmad, S. T. Chakradhar, A. Raghunathan, and T. N.Vijaykumar, “Tarazu: Optimizing mapreduce on heterogeneousclusters,” SIGARCH Comput. Archit. News, vol. 40,pp. 61–74, Mar. 2012.

[17] J. Sanders and E. Kandrot, CUDA by Example: AnIntroduction to General-Purpose GPU Programming.Addison-Wesley Professional, 1st ed., 2010.

[18] K. Shvachko, H. Kuang, S. Radia, and R. Chansler, “Thehadoop distributed file system,” in Proceedings of the 2010IEEE 26th Symposium on Mass Storage Systems andTechnologies (MSST), MSST ’10, (Washington, DC, USA),pp. 1–10, IEEE Computer Society, 2010.

[19] “Hadoop streaming.” [Online]. Available:http://bit.ly/1Hc17Rg. (accessed January 02, 2015).

[20] C. Dave, H. Bae, S.-J. Min, S. Lee, R. Eigenmann, andS. Midkiff, “Cetus: A source-to-source compiler infrastructurefor multicores,” IEEE Computer, vol. 42, no. 12, pp. 36–42,2009.

[21] libHDFS, “API http://wiki.apache.org/hadoop/LibHDFS.”

[22] S. Sengupta, M. Harris, Y. Zhang, and J. D. Owens, “Scanprimitives for gpu computing,” in Proceedings of the 22NdACM SIGGRAPH/EUROGRAPHICS Symposium onGraphics Hardware, GH ’07, (Aire-la-Ville, Switzerland,Switzerland), pp. 97–106, Eurographics Association, 2007.

[23] N. Satish, M. Harris, and M. Garland, “Designing efficientsorting algorithms for manycore gpus,” in Parallel DistributedProcessing, 2009. IPDPS 2009. IEEE InternationalSymposium on, pp. 1–10, May 2009.

[24] A. Davidson, D. Tarjan, M. Garland, and J. Owens, “Efficientparallel merge sort for fixed and variable length keys,” inInnovative Parallel Computing (InPar), pp. 1–9, May 2012.

[25] F. Ahmad, S. Lee, M. Thottethodi, and T. N. Vijaykumar,“Puma: Purdue mapreduce benchmarks suite,” Tech. Rep.Paper 437, School of Electrical and Computer Engineering,Purdue University, West Lafayette, Indiana, October 2012.

[26] J. Towns, T. Cockerill, M. Dahan, I. Foster, K. Gaither,A. Grimshaw, V. Hazlewood, S. Lathrop, D. Lifka, G. D.Peterson, R. Roskies, J. R. Scott, and N. Wilkens-Diehr,“Xsede: Accelerating scientific discovery,” Computing inScience and Engineering, vol. 16, no. 5, pp. 62–74, 2014.

[27] W. Jiang, V. T. Ravi, and G. Agrawal, “A map-reduce systemwith an alternate api for multi-core environments,” inProceedings of the 10th IEEE/ACM International Conferenceon Cluster, Cloud and Grid Computing, CCGRID,(Washington, DC, USA), pp. 84–93, IEEE Computer Society,2010.

[28] C. Ranger, R. Raghuraman, A. Penmetsa, G. Bradski, andC. Kozyrakis, “Evaluating mapreduce for multi-core andmultiprocessor systems,” in Proceedings of the 2007 IEEE 13thInternational Symposium on High Performance ComputerArchitecture, HPCA ’07, (Washington, DC, USA), pp. 13–24,IEEE Computer Society, 2007.

[29] M. Grossman, M. Breternitz, and V. Sarkar, “HadoopCL:Mapreduce on distributed heterogeneous platforms throughseamless integration of hadoop and opencl,” in Proceedings ofthe 2013 IEEE 27th International Symposium on Paralleland Distributed Processing Workshops and PhD Forum,IPDPSW ’13, (Washington, DC, USA), pp. 1918–1927, IEEEComputer Society, 2013.

[30] K. Shirahata, H. Sato, and S. Matsuoka, “Hybrid map taskscheduling for gpu-based heterogeneous clusters,” 2013 IEEE5th International Conference on Cloud ComputingTechnology and Science, vol. 0, pp. 733–740, 2010.

[31] L. Chen, X. Huo, and G. Agrawal, “Accelerating mapreduce ona coupled cpu-gpu architecture,” in Proceedings of theInternational Conference on High Performance Computing,Networking, Storage and Analysis, SC ’12, (Los Alamitos, CA,USA), pp. 25:1–25:11, IEEE Computer Society Press, 2012.

[32] L. Chen and G. Agrawal, “Optimizing mapreduce for gpus witheffective shared memory usage,” in Proceedings of the 21stInternational Symposium on High-Performance Parallel andDistributed Computing, HPDC ’12, (New York, NY, USA),pp. 199–210, ACM, 2012.