reactive resource provisioning heuristics for dynamic ... resource... · uate the reactive...

14
Reactive Resource Provisioning Heuristics for Dynamic Dataflows on Cloud Infrastructure Alok Gautam Kumbhare, Student Member, IEEE, Yogesh Simmhan, Senior Member, IEEE, Marc Frincu, Member, IEEE, and Viktor K. Prasanna, Fellow, IEEE Abstract—The need for low latency analysis over high-velocity data streams motivates the need for distributed continuous dataflow systems. Contemporary stream processing systems use simple techniques to scale on elastic cloud resources to handle variable data rates. However, application QoS is also impacted by variability in resource performance exhibited by clouds and hence necessitates “dynamic dataflows” which utilize alternate tasks as additional control over the dataflow’s cost and QoS. Further, we formalize an optimization problem to represent deployment and runtime resource provisioning that allows us to balance the application’s QoS, value, and the resource cost. We propose two greedy heuristics, centralized and sharded, based on the variable-sized bin packing algorithm and compare against a Genetic Algorithm (GA) based heuristic that gives a near-optimal solution. A large-scale simulation study, using the Linear Road Benchmark and VM performance traces from the AWS public cloud, shows that while GA-based heuristic provides a better quality schedule, the greedy heuristics are more practical, and can intelligently utilize cloud elasticity to mitigate the effect of variability, both in input data rates and cloud resource performance, to meet the QoS of fast data applications. Index Terms—Dataflows, stream processing, cloud, resource management, scheduling, high velocity data, runtime adaptation 1 I NTRODUCTION The expansion of ubiquitous virtual and physical sen- sors, leading up to the Internet of Things, has accelerated the rate and quantity of data being generated continu- ously. As a result, the need to manage and analyze such high velocity data in real-time forms one of the three dimensions of “Big Data”, besides volume and variety [1]. While the past decade has seen sophisticated platforms for scalable offline analytics on large data volumes [2], Big Data systems for continuous analytics that adapt to the number, rate and variability of streams are relatively less well-studied. There is a growing class of streaming applications in diverse domains: trend analysis and social network mod- eling for online advertising [3], real-time event processing to detect abnormal behavior in complex systems [4], and mission-critical use-cases such as smart traffic signal- ing [5] and demand-response in smart power grids [6]. These applications are characterized by the variety of in- put data streams, each with variable data rates. Further, data arrives at high velocity and needs to be analyzed with guaranteed low-latency even in the presence of data rate fluctuations. Hence, such applications lie at the intersection of the velocity and variety dimensions of the Big Data landscape. While run-time scalability and seamless fault-tolerance together are the key requirements for handling high velocity variable-rate data streams, in this paper, we A. Kumbhare is with the Department of Computer Science, and V. K. Prasanna and M. Frincu are with Department of Electrical Engineering at University of Southern California, Los Angeles, USA. Email: {kumbhare, prasanna, frincu}@usc.edu Y. Simmhan is with Supercomputer, Education and Research Cen- tre (SERC) at Indian Institute of Science, Bangalore, India. Email: [email protected] emphasize on scalability of stream processing applica- tions with increasing data velocity and their adaptation to fluctuating data rates. Existing stream processing (or continuous dataflow) systems (SPS) [7], [8], [5], [9] allow users to compose applications as task graphs that consume and process continuous data, and execute on distributed commodity clusters and clouds. These systems support scalability with respect to high input data rates over static resource deployments, assuming the input rates are stable. When the input rates change, their static resource allocation causes over- or under- provisioning, resulting in wasted resources during low data rate periods and high processing latency during high data rate periods. Storm’s rebalance [8] function allows users to monitor the incoming data rates and redeploy the application on-demand across a different set of resources, but requires the application to be paused. This can cause message loss or processing delays during the redeployment. As a result, such systems offer limited self-manageability to changing data rates, which we address in this article. Recent SPSs such as Esc [10] and StreamCloud [11] have harnessed the cloud’s elasticity to dynamically acquire and release resources based on application load. However, several works [12], [13], [14] have shown that the performance of public and private clouds themselves vary: for different resources, across the data center, and over time. Such variability impacts low latency stream- ing applications, and causes adaptation algorithms that assume reliable resource performance to fail. Hence, an- other gap we address here is autonomic self-optimization to respond to cloud performance variability for stream- ing applications. In addition, the pay-per-use cost model of commercial clouds requires intelligent resource man- agement to minimize the real cost while satisfying the streaming application’s quality of service (QoS) needs. autonomic methods of provisioning elastic resources to support such applications on cloud infrastructure. We develop the concept of IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Upload: others

Post on 03-Jul-2020

2 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

Reactive Resource Provisioning Heuristics forDynamic Dataflows on Cloud Infrastructure

Alok Gautam Kumbhare, Student Member, IEEE, Yogesh Simmhan, Senior Member, IEEE,Marc Frincu, Member, IEEE, and Viktor K. Prasanna, Fellow, IEEE

Abstract—The need for low latency analysis over high-velocity data streams motivates the need for distributed continuous dataflowsystems. Contemporary stream processing systems use simple techniques to scale on elastic cloud resources to handle variable datarates. However, application QoS is also impacted by variability in resource performance exhibited by clouds and hence necessitates

“dynamic dataflows” which utilize alternate tasks as additional control over the dataflow’s cost and QoS. Further, we formalize anoptimization problem to represent deployment and runtime resource provisioning that allows us to balance the application’s QoS, value,and the resource cost. We propose two greedy heuristics, centralized and sharded, based on the variable-sized bin packing algorithmand compare against a Genetic Algorithm (GA) based heuristic that gives a near-optimal solution. A large-scale simulation study, usingthe Linear Road Benchmark and VM performance traces from the AWS public cloud, shows that while GA-based heuristic providesa better quality schedule, the greedy heuristics are more practical, and can intelligently utilize cloud elasticity to mitigate the effect ofvariability, both in input data rates and cloud resource performance, to meet the QoS of fast data applications.

Index Terms—Dataflows, stream processing, cloud, resource management, scheduling, high velocity data, runtime adaptation

F

1 INTRODUCTIONThe expansion of ubiquitous virtual and physical sen-sors, leading up to the Internet of Things, has acceleratedthe rate and quantity of data being generated continu-ously. As a result, the need to manage and analyze suchhigh velocity data in real-time forms one of the threedimensions of “Big Data”, besides volume and variety [1].While the past decade has seen sophisticated platformsfor scalable offline analytics on large data volumes [2],Big Data systems for continuous analytics that adapt tothe number, rate and variability of streams are relativelyless well-studied.

There is a growing class of streaming applications indiverse domains: trend analysis and social network mod-eling for online advertising [3], real-time event processingto detect abnormal behavior in complex systems [4], andmission-critical use-cases such as smart traffic signal-ing [5] and demand-response in smart power grids [6].These applications are characterized by the variety of in-put data streams, each with variable data rates. Further,data arrives at high velocity and needs to be analyzedwith guaranteed low-latency even in the presence ofdata rate fluctuations. Hence, such applications lie at theintersection of the velocity and variety dimensions of theBig Data landscape.

While run-time scalability and seamless fault-tolerancetogether are the key requirements for handling highvelocity variable-rate data streams, in this paper, we

• A. Kumbhare is with the Department of Computer Science, and V. K.Prasanna and M. Frincu are with Department of Electrical Engineering atUniversity of Southern California, Los Angeles, USA. Email: kumbhare,prasanna, [email protected]

• Y. Simmhan is with Supercomputer, Education and Research Cen-tre (SERC) at Indian Institute of Science, Bangalore, India. Email:[email protected]

emphasize on scalability of stream processing applica-tions with increasing data velocity and their adaptationto fluctuating data rates. Existing stream processing (orcontinuous dataflow) systems (SPS) [7], [8], [5], [9]allow users to compose applications as task graphsthat consume and process continuous data, and executeon distributed commodity clusters and clouds. Thesesystems support scalability with respect to high inputdata rates over static resource deployments, assumingthe input rates are stable. When the input rates change,their static resource allocation causes over- or under-provisioning, resulting in wasted resources during lowdata rate periods and high processing latency duringhigh data rate periods. Storm’s rebalance [8] functionallows users to monitor the incoming data rates andredeploy the application on-demand across a differentset of resources, but requires the application to bepaused. This can cause message loss or processing delaysduring the redeployment. As a result, such systems offerlimited self-manageability to changing data rates, whichwe address in this article.

Recent SPSs such as Esc [10] and StreamCloud [11]have harnessed the cloud’s elasticity to dynamicallyacquire and release resources based on application load.However, several works [12], [13], [14] have shown thatthe performance of public and private clouds themselvesvary: for different resources, across the data center, andover time. Such variability impacts low latency stream-ing applications, and causes adaptation algorithms thatassume reliable resource performance to fail. Hence, an-other gap we address here is autonomic self-optimizationto respond to cloud performance variability for stream-ing applications. In addition, the pay-per-use cost modelof commercial clouds requires intelligent resource man-agement to minimize the real cost while satisfying thestreaming application’s quality of service (QoS) needs.

autonomic methods of provisioning elastic resources to support such applications on cloud infrastructure. We develop the concept of

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 2: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

In this article we push towards autonomic provision-ing of continuous dataflow applications to enable scal-able execution on clouds by leveraging cloud elasticityand addressing the following issues:

1) Autonomous runtime adaptions in response to fluc-tuations in both input data rates and cloud resourceperformance.

2) Offering flexible trade-offs to balance monetary costof cloud resources against the users’ perceived ap-plication value.

This article extends our previous work [15], whichintroduced the notion of “dynamic dataflows” andproposed greedy reactive resource provisioning heuris-tics (§ 8) to exploit cloud elasticity and the flexibilityoffered by dynamic dataflows. Our contributions in thisarticle are:• Application Model and Optimization Problem:

We develop the application model for dynamicdataflows (§ 2) as well as the infrastructure modelto represent IaaS cloud characteristics (§ 3), andpropose an optimization problem (§ 6) for resourceprovisioning that balances the resource cost, appli-cation throughput and the domain value based onuser-defined constraints.

• Algorithms: We present a Genetic Algorithm (GA)-based heuristic for deployment and runtime adap-tation of continuous dataflows (§ 7) to solve theoptimization problem. We also propose efficientgreedy heuristics (centralized and sharded variants)that sacrifice optimality over efficiency, which iscritical for low latency streaming applications (§ 8).

• Evaluation: We extend the Linear Road Benchmark(LRB) [16] as a dynamic dataflow application, whichincorporates dynamic processing elements, to eval-uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, usingVM and network performance traces from AmazonAWS cloud service provider. Finally, we offer acomparative analysis of the greedy heuristics againstthe GA in terms of scalability, profit, and QoS (§ 9.2).

2 DYNAMIC DATAFLOW APPLICATION MODELWe leverage the familiar Directed Acyclic Graph (DAG)model to define Continuous Dataflows (Def. 1). This al-lows users to compose loosely coupled applications fromindividual tasks with data dependencies between themdefined as streaming dataflow edges. While, in practice,this model can be extended to include more complexconstructs like back flows/cycles, we limit our discus-sion to DAGs to keep the application model simple andthe optimization problem tractable.

Def. 1: A continuous dataflow G is a quadruple G =〈P,E, I,O〉, where P = P1, P2, ..., Pn is the set ofProcessing Elements (PE) and E = 〈Pi, Pj〉 | Pi, Pj ∈ Pis a set of directed dataflow edges without cycles suchthat data messages flow from Pi to Pj . I 6= ∅ ⊂ P is a setof input PEs which receive messages only from externaldata streams, and O 6= ∅ ⊂ P is a set of output PEs thatemit output messages only to external entities.

Each PE represents a long-running, user-defined taskwhich executes continuously, accepting and consumingmessages from its incoming ports and producing mes-sages on the outgoing ports. A directed edge betweentwo PEs connects an output port from the source PE toan input port of the sink PE, and represents a flow ofmessages between the two. Without loss of generality, weassume and-split semantics for edges originating from thesame output port of a PE (i.e., output messages on a portare duplicated on all outgoing edges) and multi-mergesemantics [17] for edges terminating at an input port ofanother PE (i.e., input messages from all incoming edgeson a port are interleaved).

We define Dynamic Dataflows (Def. 2) as an extensionto continuous dataflows by incorporating the concept ofdynamic PEs [15]. Dynamic PEs consists of one or moreuser-defined alternative implementations (alternates) forthe given PE, any one of which may be selected as an ac-tive alternate at run-time. Each alternate may possess dif-ferent performance characteristics, resource requirements anddomain perceived functional quality (value). Heterogeneouscomputing [18] and (batch processing) workflows [19]incorporate a similar notion where the active alternatesare decided once at deployment time but thereafter remainfixed during execution. We extend this to continuousdataflows where alternate selection is an on-going pro-cess at runtime. This allows the execution framework toperform autonomic adaptations by dynamically alteringthe active alternates for an application to meet its QoSneeds based on current conditions.

Def. 2 (Dynamic Dataflow): A Dynamic DataflowD = 〈P, E, I,O〉 is a continuous dataflow where each PEPi ∈ P has a set of alternates Pi = p1

i , p2i , ...p

ji | j ≥ 1

where pji = 〈γji , cji , s

ji 〉. γ

ji , cji , and sji denote the relative

value, the processing cost per message, and the selectivityfor the alternate pji of PE Pi respectively.

Selectivity, sji , is the ratio of the number of outputmessages produced to the number of input messagesconsumed by the alternate pji to complete a logical unitof operation. It helps determine the outgoing data rateof a PE relative to its input data rate, and thereby itscascading impact on downstream PEs in the dataflow.

Each alternate has associated cost and value functionsto assist with alternate selection and resource provision-ing decisions. The relative value, 0 < γji ≤ 1, for analternate pji is:

γji =f(pji )

MAXjf(pji )(1)

where f : Pi → R is a user-defined value function for thealternates. It quantifies the relative domain benefit to theuser of picking that alternate. For e.g., a Classification PEthat classifies its input tuples into different classes mayuse the F1 score1 as the quality of that algorithm to thedomain, and F1 can be used to calculate the relative valuefor alternates of the PE.

1. F1 = 2× precision×recallprecision+recall

is a measure of the classifier’s labelingaccuracy.

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 3: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

<dataflow><PE id="parser"><in id="tweet" type="string" /><out id="cleaned" type="string" /><alternate impl="Parser.TweetParser" /></PE><PE id="classifier"><in id="cleaned_twt" type="string" /><out id="tpc_twt" type="string" /><alternate impl="Classifier.Bayes" value="0.65" /><alternate impl="Classifier.LDA" value="0.70" /><alternate impl="Classifier.MWE" value="1.00" /></PE>

<edge source="parser:cleaned"sink="classifier:cleaned_twt" />

</dataflow>

Fig. 1: Sample declarative representation of Dynamic Dataflowusing XML. Equivalent visual representation is in Fig. 2(a).

Finally, the processing cost per message, cji , is the time(in seconds) required to process one message on a “refer-ence” CPU core (§ 3) for the alternate pji . The processingneeds of an alternate determines the resources requiredto processes incoming data streams at a desired rate.

The concept of dynamic PEs and alternates provides apowerful abstraction and an additional point of controlto the user. A sample dynamic dataflow is shown inFig. 1 using a generic XML representation, with a visualequivalent shown in Fig. 2(a). Any existing dataflowrepresentation, such as declarative [20], functional [8] orimperative [9] may also be used. The sample dataflowcontinuously parses incoming tweets, and classifies theminto different topics. It consists of two PEs: parser, andclassifier, connected using a dataflow edge. While theparser PE consists of only one implementation, the clas-sifier PE consists of three alternates, using the Bayes,Latent Dirichlet Allocation (LDA) and Multi-Word en-hancement (MWE) to LDA algorithms, respectively. Eachalternate varies in classification accuracy and hence hasdifferent value to the domain; these are normalized rela-tive to the best among the three. The three alternates areavailable for dynamic selection at runtime. For brevity,we omit a more detailed discussion of the dynamicdataflow programming abstraction.

The execution and scalability of a dynamic dataflowapplication depends on the capabilities of the underly-ing infrastructure. Hence, we develop an infrastructuremodel to abstract the characteristics that impacts theapplication execution and use that to define the resourceprovisioning optimization problem (§ 6).

3 CLOUD INFRASTRUCTURE MODELWe assume an Infrastructure as a Service (IaaS) cloudthat provides access to virtual machines (VMs) and ashared network. In IaaS clouds, a user has no controlover the VM placement on physical hardware, the multi-tenancy, or the network behavior between VMs. Thecloud environment provides a set of VM resource classesC = C1, C2, ..., Cn that differ in the number of availablevirtual CPU cores N , their rated core speed π, and theirrated network bandwidth β. In this article, we focus onCPU bound PEs that operate on incoming data streams

from the network. As a result, we ignore memory anddisk characteristics and only use the VM’s CPU and net-work behavior in the infrastructure performance modelused by our adaptation heuristics.

As CPU core speeds may vary across VM classes, wedefine the normalized processing power πi of a resourceclass Ci’s CPU core as the ratio of its processing powerto that of a reference VM core. Naıvely, this may be theratio of their clock speeds, but could also be obtained byrunning application benchmarks on different sets of VMsand comparing them against a defined reference VM, oruse Cloud-providers’ “ratings” such as Amazon’s ElasticCompute Units (ECUs).

The set of VM resources acquired till time t is denotedby R(t) = r1, r2, ..., rn. Each VM is described by ri =〈Cij , tistart, tistop〉 where Cij is the resource class to whichthe VM belongs, and tistart and tistop are the times atwhich the VM was acquired and released, respectively.tstop =∞ for an active VM.

The peer-to-peer network characteristic between pairsof VMs, ri and rj , is given by λi×j and βi×j , where λi×jis the network latency between VM ri and rj and βi×j istheir available bandwidth.

VMs are typically charged at whole VM-hours bycurrent cloud providers. The user is billed for the entirehour even if a VM is released before an hour boundary.The total accumulated monetary cost for the VM ri at timet is then calculated as:

µi(t) = dmin(tstop, t) − tstart60

e× cost per VM hour (2)

where min(tstop, t)− tstart is the duration in minutes forwhich the VM has been active.

We gauge the on-going performance of virtualizedcloud resources, and the variability relative to theirrated capability, using a presumed monitoring frame-work. This periodically probes the compute and networkperformance of VMs using standard benchmarks. Thenormalized processing power of a VM ri observed at timet is given by πi(t), and the network latency and bandwidthbetween pairs of active VM ri and rj are λi×j(t) andβi×j(t), respectively. To minimize overhead, we onlymonitor the network characteristics between VMs thathost neighboring PEs in the DAG to assess their impacton dataflow throughput. We assume that rated networkperformance as defined by the provider is maintained forother VM pairs. Two PEs collocated in the same VM areassumed to transfer messages in-memory, i.e., λi×i → 0and βi×i →∞.

4 DEPLOYMENT AND ADAPTATION APPROACH

Based on the dynamic dataflow and cloud infrastruc-ture models, we propose a deployment and autonomicruntime adaptation approach that attempts to balancesimplicity, realistic cloud characteristics (e.g., billing model,elasticity), and user flexibility (e.g., dynamic PEs). Later,we formally define a meaningful yet tractable optimiza-tion problem for the deployment and runtime adaptationstrategies (§ 6).

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 4: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

E1

E2

𝒆𝟏𝟏

𝒆𝟐𝟏

VM2

VM3

VM1

𝒆𝟏𝟏

𝒆𝟏𝟏 𝒆𝟐

𝟐

(a) (b)

(c)

𝒆𝟐𝟐

𝒆𝟐𝟑

Parser PEClassifier PE

γ = 0.65

γ = 0.7

γ = 1.0

E1

E2

𝒆𝟏𝟏

𝒆𝟐𝟏

𝒆𝟐𝟐

𝒆𝟐𝟑

Parser PEClassifier PE

core req. = 4

Source Edges

Sink Edges

Alternates & Values Active Alternate & reference cores req.

core req. = 9

Large, 4 cores, π = 2 Medium, 2 cores, π = 2

Small, 1 core, π = 1

𝒆𝟐𝟐

𝒆𝟐𝟐

𝒆𝟐𝟐

𝒆𝟐𝟐

Fig. 2: (a) A sample dynamic dataflow. (b) Dataflow withselected alternates (e11, e22) and their initial core require-ments. (c) A deployment of the dataflow onto VMs.

We make several practical assumptions on the contin-uous dataflow processing framework, reflecting featuresavailable in existing systems [21], [8]:1) The dataflow application is deployed on distributedmachines, with a set of instances of each PE (ϕ(t) at time t)running in parallel across them. Incoming messages areactively load-balanced across the different PE instancesbased on the processing power of the CPU cores theyrun on and the length of the pending input queue. Thisallows us to easily scale the application by increasing thenumber of PE instances if the incoming load increases.2) Within a single multi-core machine, multiple in-stances of different PEs run in parallel, isolated onseparate cores. A core is exclusively allocated to one PEinstance. We assume that there is minimal interferencefrom system processes.3) Running n data-parallel instances of a PE on n CPUcores, each with processing power π = 1, is equivalent torunning 1 instance of the PE on 1 CPU core with π = n.4) The active alternate for a PE is not dependent on theactive alternate of any other PE, since each alternate fora given PE follows the same input/output format. Thisallows us to independently switch the active alternatefor different PEs during runtime.5) The framework can spin up or shutdown cloud VMson-demand (with an associated startup/shutdown la-tency) and can periodically monitor the VM characteris-tics, such as CPU performance and network bandwidth.

Given these assumptions we define the followingdeployment model. When a dynamic dataflow, suchas Fig. 2(a), is submitted for execution, the schedulerfor the stream processing framework needs to makeseveral decisions: alternate selection for each dynamicPE, acquisition of VMs, mapping of these PEs to theacquired VMs, and deciding the number of data parallelinstances per PE. These activities are divided into twophases: deployment time and runtime strategies.

Deployment time strategies select the initial active al-ternate for each PE, and determine their CPU corerequirements (relative to the “reference” core) basedon estimated initial message data rates and rated VMperformance. Fig. 2(b) shows the outcome of selectingalternates, picking e1

1 and e22 for PEs E1 and E2 with their

respective core requirements. Further, it determines theVMs of particular resource classes that are instantiated,and the mapping M from the data-parallel instances ofthe each PE (ϕ(t)) to the active VMs (R(t)), followingwhich the dataflow execution starts. Fig. 2(c) showsmultiple data-parallel instances of these PEs deployedon a set VMs of different types. Note that the numberof PE instances in Fig. 2(c) – ϕ(t) is 2 and 5 for e1

1 ande2

2 – is not equal to the core requirements in Fig. 2(b) –4 and 9 cores – since some instances are run on fasterCPU cores (π > 1).

Runtime strategies, on the other hand, are responsiblefor periodic adaptations to the application deploymentin response to the variability in the input data ratesand the VM performance obtained from the monitoringframework. These active decisions are determined by theruntime heuristics that can decide to switch the activealternate for a PE, or change the resources allocated to aPE within or across VMs. The acquisition and release ofVMs are also tied to these decisions as they determinethe actual cost paid to the cloud service provider. Aformal definition of the optimization problem and thesecontrol strategies will be presented in § 6.

5 METRICS FOR QUALITY OF SERVICEIn § 2, we captured the value, and processing require-ments for individual PEs and their alternates using themetrics: relative value (γji ), alternate processing cost (cji ),and selectivity (sji ). In this section, we expand these QoSmetrics to the entire dataflow application.

We define an optimization period T for which thedataflow is executed. This optimization period is dividedinto time intervals T = t0, t1, ..., tn. We assume theseinterval lengths are constant, 4t = ti+1 − ti. For brevitywe omit the suffix i while referring to the time intervalti, unless necessary for disambiguation.

A dataflow is initially deployed with a particularconfiguration of alternates which can later be switchedduring runtime to meet the application’s QoS. To keepthe problem tractable and avoid repetitive switches,these changes are only made at the start of each intervalti. This means that during a time interval t, only aspecific alternate for a PE Pi is active. The value of thePE Pi during the time interval t is thus:

Γi(t) =∑pji∈Pi

(Aji (t) · γji

)Aji (t) =

1, if alternate pji is active at time t0, otherwise

Since value can be perceived as an additive property [22]over the dataflow DAG, we aggregate the individualvalues of active alternates to obtain the value for theentire dynamic dataflow during the time interval t.

Def. 3 (Normalized Application Value): The normal-ized application value, 0 < Γ(t) ≤ 1, for a dynamicdataflow D during the time interval t is:

Γ(t) =

∑Pi∈P Γi(t)

|P|(3)

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 5: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

where |P| is the number of PEs in the dataflow.The application’s value thus obtained gives an indica-

tion of its overall quality from the domain’s perspectiveand can be considered as one QoS dimension.

Another QoS criterion, particularly in the contextof continuous dataflows, is the observed applicationthroughput. However, raw application throughput is notmeaningful because it is a function of the input datarates during that time interval. Instead we define therelative application throughput, Ω, built up from the relativethroughput 0 < Ωi(t) ≤ 1 of individual PEs Pi duringthe interval t. These are defined as the ratio of thePEs’ current output data rate (absolute throughput) oi(t)to the maximum achievable output data rate omaxi (t) :Ωi(t) = oi(t)

omaxi (t)

The output data rate for a PE depends on the selec-tivity of the active alternate, and is bound by the totalresources available to the PE to process the inputs giventhe processing cost of the active alternate. The actualoutput data rate during the interval t is:

oi(t) =min

(qi(t) + ii(t) ·∆t, φi·∆t

cji

)× sji

∆t(4)

where qi(t) is the number of pending input messages in thequeue for PE Pi , ii(t) is the input data rate in the timeinterval t for the PE , φi is its total core allocation for the PE(φi =

∑k πk), and cji is the processing cost per message

and sji is the selectivity for the active alternate pji .The maximum output throughput is achieved when there

are enough resources for Pi to process all incoming datamessages including messages pending in the queue atthe start of the interval t. This is given by omaxi (t) =(qi(t)+ii(t)×∆t)×sji

∆t .While the input data rate for the source PE is deter-

mined externally, the input rate for other PEs can becharacterized as follows. The flow of messages betweenconsecutive PEs is limited by the bandwidth, duringthe interval t, between the VMs on which the PEs aredeployed. We define the flow fi,j(t) from Pi to Pj as:

fi,j(t) =

min

(oi,

βi,j(t)·4tm

), 〈Pi,Pj〉 ∈ E

0, otherwise(5)

where βi,j(t) is the available cumulative bandwidth be-tween all instances of Pi and Pj at time t and m is theaverage output message size.

Given the multi-merge semantics for incoming edges,the input data rate for Pk during time t is:

ik(t) =∑

fj,k(t) (6)

Unlike the application’s value, its relative throughputis not additive as it depends on the critical process-ing path of the PEs during that interval. The relativethroughput for the entire application is the ratio ofobserved cumulative outgoing data rate from the outputPEs, O = Oi, to the maximum achievable output ratefor those output PEs, for the current input data rate.

Def. 4 (Relative Application Throughput): The rela-tive application throughput, 0 < Ω(t) ≤ 1 , for dynamicdataflow D = 〈P, E, I,O〉 during the time interval t is:

Ω(t) =

∑Pi∈O Ωi(t)

|O|(7)

The output data rate for the output PEs is obtainedby calculating the output data rate of individual PEs(Eqn. 4) followed by the flow fi,j between consecutivePEs (Eqn. 5), repeatedly in a breadth-first scan of theDAG starting from its input PEs, till the input and outputdata rates for the output PEs is obtained.

Normalized Application Value, Γ(t), and Relative Appli-cation Throughput, Ω(t), together provide complimen-tary QoS metrics to assess overall application execution,which we use to define an optimization problem thatbalances these QoS metrics based on user-defined con-straints in the next section.

6 PROBLEM FORMULATIONWe formulate the optimization problem as a constrainedutility maximization problem during the period T forwhich the dataflow is executed. The constraint ensuresthat the expected relative application throughput meets athreshold, Ω ≥ Ω; the utility to be maximized is a functionof the normalized application value, Γ; and the cost forcloud resources, µ, during the optimization period T .

For a dynamic dataflow D = 〈P, E, I,O〉, the esti-mated input data rates, I(t0) = ij(t0) , at each inputPE, Pj ∈ I, at initial time, t0, is given. During eachsubsequent time interval, ti, based on the observationsof the monitoring framework during ti−1, we have thefollowing: the observed input data rates, I(t) = ij(t);the set of active VMs, R(t) = r1, r2, ..., rm; the normal-ized processing power per core for each VM rj , π(t) =πj(t); the network latency and the bandwidth betweenpairs of VMs ri, rj ∈ R(t) hosting neighboring PEs areλ(t) = λi×j(t) and β(t) = βi×j(t), respectively.

At any time interval t, we can calculate the relativeapplication throughput Ω(t) (Eqn. 7), the normalized appli-cation value Γ(t) (Eqn. 3), and the cumulative monetarycost µ(t) till time t (Eqn. 2).

The average relative application throughput (Ω), theaverage relative application value (Γ), and the totalresource cost (µ) for the entire optimization periodT = t0, t1, . . . , tn are:

Ω =

∑t∈T Ω(t)

|T |Γ =

∑t∈T Γ(t)

|T |µ = µ(tn)

We define the combined utility as a function of boththe total resource cost (µ) and the average applicationvalue (Γ). To help the users to trade-off between cost andvalue, we allow them to define the expected maximumcost at which they break-even for the two extremes ofapplication value, i.e., the values obtained by selectingthe best alternates for all PEs, on one end, and by selectingthe worst alternates, on the other. For simplicity, we as-sume a linear function to derive the expected maximumresource cost at an intermediate application value, as

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 6: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

0

20

40

60

80

100

120

0.65 1

Co

st o

ver

du

rati

on

T (

μ)

($)

Application Value (Γ)ΓmaxΓmin

𝐶𝑚𝑎𝑥Γmax

𝐶𝑚𝑎𝑥Γmin

Profit

Loss

0.4

0.5

0.6

0.7

0.8

0.9

1

1.1

20 40 60 80 100 120

Ap

plic

atio

n V

alu

e (Γ

)

Cost over duration T (μ) (US $)

ProfitLoss

Γmin

Γmax

𝐶𝑚𝑎𝑥Γmin 𝐶𝑚𝑎𝑥

Γmax

Break-even

Fig. 3: A sample linear function for trade-off between cost (C)and value (Γ). The line denotes break-even, with a slope = σ.

shown in Fig. 3. If the actual resource cost lies below thisbreak-even we consider it as profit, while if it lies above,we consider it as loss. This can be captured using thefollowing objective function, Θ , which is to be maximizedover the optimization period under the constraint Ω ≥ Ω:

Θ = Γ− σ · (µ − CΓmaxmax ) + 1 (8)

where σ is a equivalence coefficient between cost and valuegiven by the slope:

σ =Γmax − Γmin

CΓmaxmax − CΓmin

max

(9)

Γmax and Γmin are the maximum and minimum possiblerelative application values when picking the alternateswith the best and worst values for each PE, respectively,while CΓmax

max and CΓminmax are the user-defined break-even

resource cost at Γmax and Γmin.Given the deployment approach (§ 4), the above ob-

jective function Θ can be maximized by choosing appro-priate values for the following control parameters at thestart of each interval ti during the optimization period:• Aji (t), the active alternate j for the PE Pi• R(t) = rj(t), the set of VMs in R(t)• ϕ(t) = ϕj(t), the set of data-parallel instances for

each PE Pj ; and• M(t) = ϕj(t)→Mj×k(t), the mapping of a data-

parallel instances ϕj for PE Pj to the actual VM rkOptimally solving the objective function Θ with the

Ω constraint is NP-Hard. The proof is outside the scopeof this article and a sketch is presented in our earlierwork [15]. While techniques like integer programmingand branch-and-bound have been used to optimallysolve some NP-hard problems [23], these do not ade-quately translate to low-latency solutions for continuousadaptation decisions. The dynamic nature of the applica-tion and the infrastructure, as well as the tightly-bounddecision making interval means that fast heuristics per-formed repeatedly are better than slow optimal solu-tions. We thus propose simplified heuristics to providean approximate solution to the objective function.

Other approximate procedures such as gradientdescent are not directly applicable to the problemat hand since the optimization problem presents anon-differentiable, non-continuous function. However,nature-inspired search algorithms such as Genetic Al-gorithms (GAs), ant-colony optimization, and particle-swarm optimization, which follow a guided randomized

Population

0S1 S2

0

E1

Population

4. Survive Fittest

0,1,1 1,1

0S1 S2

2

E2

0,0,1 1

0S1 S2

1 1 0Bit mask

0S1 S2

2 0,0,1 1,1

OF1 2. Cross Over

1. Select Parent Pairs

3. Mutation

0S1 S2

1 0,0,1 0,1,1

OF1

0S1 S2

1

E2m-1

0,0 1,1

0S1 S2

0

E2m

0,1 1

ActiveAlternate: 𝒑𝟎

𝟎

𝑃𝐸: 𝓟𝟎

PE Instances:

𝑉𝑀: 𝒓0

1 𝑜𝑓 𝓟𝟎

2 𝑜𝑓 𝓟1

Parent Chromosome

Fig. 4: Sample iteration for the GA heuristic.

search, are sometimes more effective than traditionalheuristics in solving similar single and multi-objectiveworkflow scheduling problems [24], [25], [26], [27]. Inthis article, we also explore GAs for finding an approx-imate solution to the optimization problem. While GAsare usually slow and depend on the population size andcomplexity of the operators used, they provide a goodbaseline to compare against our greedy heuristics andare discussed in the next section, followed by the greedydeployment and adaptation heuristics in section 8.

7 GENETIC ALGORITHM-BASED SCHEDULINGA GA [28] is a meta-heuristic used in optimization andcombinatorial problems. It facilitates the exploration ofa large search space by iteratively evolving a numberof candidate solutions towards the global optimum. TheGA meta-heuristic abstracts out the structure of the solu-tion for a given problem in terms of a chromosome madeup of several genes. It then explores the solution spaceby evolving a set of chromosomes (potential solutions)over a number of generations.

The GA initially generates a random population ofchromosomes which act as the seed for the search.The algorithm then performs genetic operations such ascrossover and mutations to iteratively obtain successivegenerations of these chromosomes. The crossover opera-tor takes a pair of parent chromosomes and generatesan offspring chromosome by crossing over individualgenes from each parent. This helps potentially combinepartial solutions from the parents into a single offspring.Further, the mutation operator is used to randomly altersome parts of a given chromosomes and advance thesearch by possibly avoiding getting stuck in a localoptimum. The algorithm then applies a selection operatorwhich picks the best chromosomes from the entire popu-lation based on their fitness values and eliminates the rest.This process is repeated until a stopping criterion, such asa certain number of iterations or the convergence of afitness value, is met.

We adapt GA to our optimization problem by definingthese domain-specific data structures and operators.Chromosome: The solution to the optimization problem(Eqn. 8) requires (1) determining the best alternate toactivate for each PE, and (2) the type and number of

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 7: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

VMs, and the mapping of data-parallel PE instancesto these VMs. We capture both of these aspects ofthe deployment configuration using a double strandedchromosome (Fig. 4). Each chromosome represents onepossible deployment configuration and the goal is tofind the chromosome with the optimal configuration.The first strand (S1) represents the PEs and their activealternates, with each gene in the strand representing aPE Pi in the dataflow The value of the ith gene holds theindex j of the active alternate pji for the correspondingPE. The second strand (S2) contains genes that representthe list of available VMs with the list of PE instancesrunning on them. The value of the ith gene in strand S2holds the list of index values j for PEs Pj mapped tothe VM ri. The chromosome size (i.e. number of genes)is fixed based on the available resource budget.

For example, in Fig 4, the chromosome E1 representsa deployment where strand S1 has two PEs P0 andP1 with active alternates p0

0 and p01, respectively. Strand

S2 identifies two VMs, r0 and r1, with r0 running oneinstance of PE P0 and two instances of PE P1 on it, whiler1 has two instances of PE P1 running on it.Fitness Function: The objective function Θ acts as the fit-ness function for the chromosome, and depends on boththe strands of the chromosome. The throughput con-straint Ω ≥ Ω is incorporated using a weighted penaltyfunction that penalizes the fitness value if a constraintis violated. This ensures that while the chromosome ispenalized, it is not disregarded immediately and given achance to recover through mutations. In addition, duringruntime, the penalty function also considers the numberof changes induced on the deployment to reduce theoverhead of frequent changes in the system.

penalty =

−α· | Ω− Ω | ·it , deployment−α· | Ω− Ω | ·it+ α′ · v · it , runtime

where, α and α′ are constants, v is the number of deploy-ment changes observed in the chromosome as comparedto the previous deployment and it is the iteration countfor GA. it ensures that the penalty is increased as thechromosome survives over multiple iterations and henceallows removal of unfit chromosomes.Crossover: Each parent chromosome is first selectedfrom the population using a probabilistic ranked model(similar to the Roulette wheel) [29] while also retainingthe top 5% of the chromosomes with best fitness values.Next the parent chromosomes are paired randomly anda random bit mask is generated to choose the a genefrom either parent to produce the offspring chromosome.For e.g., in Fig. 4, parents E1 (white) and E2 (gray) areselected for crossover, and a random bit mask is used todecide if the gene from the first parent (bit is 0) or thesecond parent (bit is 1) is retained in the offspring OF1.Mutation: We allow independent mutation for the twochromosome strands. For the first strand with PE al-ternates the mutation involves randomly switching theactive alternate. For the second strand of VM instancesand mapping, we probabilistically decide whether to

remove or add a PE instance for each VM. For example,in Fig. 4, the offspring OF1 undergoes mutation byswitching the active alternate for P1 from p2

1 → p11, and

by adding an instance of P0 to the second VM. Mutatedgenes are shown with a dotted pattern.

While GAs explore a wide range of solutions and tendto give near-optimal solutions, their convergence perfor-mance becomes a bottleneck during runtime adaptation.Hence we design sub-optimal greedy heuristics thattrade optimality for speed, making them better suitedfor streaming applications.

8 GREEDY DEPLOYMENT & ADAPTATIONHEURISTICS

In this section, we propose greedy heuristics to find anapproximate solution to the optimization problem. Asbefore, the algorithm is divided into the initial deploy-ment phase and the runtime adaptation phase.

For the proposed heuristic, we provide sharded (SH)and centralized (CE) variants that differ in the quantaof information needed and the execution pattern of thescheduler. The sharded version uses one scheduler per PE,and all data-parallel instances of a PE communicate withtheir scheduler, potentially across VMs. However, sched-ulers for different PEs do not communicate. Hence eachscheduler only has access to its PE instances. In the cen-tralized version, a single scheduler gathers informationabout the entire dataflow and hence has a global viewof the execution of all PEs. As we show (§ 9.2), whilethe SH scheduler is inherently decentralized and reducesthe transfer of monitoring data during execution, the CEvariant, due to its global view, is more responsive tochanges in the execution environment.

8.1 Initial Deployment HeuristicThe initial deployment algorithm (Alg. 1) is divided intotwo stages: Alternate selection (lines 2–11) and Resourceallocation (lines 12–25). These algorithms are identicalfor both SH and CE schedulers; however, their costingfunctions differ (Table 1).

The alternate selection stage ranks each PE alternatebased on the ratio of its value to estimated cost (line 4),and chooses the one with the highest ratio. Since we donot know the actual cost for the alternates until resourceallocation, the heuristic uses the estimated processingrequirements (cAP ) as an approximation. The GETCOSTO-FALTERNATE function varies between the SH and CEversions. The SH strategy calculates an alternate’s costbased on only its processing requirements, while the CEstrategy calculates the cost of the alternate as the sumof both its own processing needs and that of its down-stream PEs – intuitively, if an upstream PE has moreresources allocated, its output message rate increases andthis has a cascading impact on the input rate of thesucceeding PEs. Also, a higher selectivity upstream PEwill further impact the successors’ cost since they willhave to process more messages. This cost is calculatedusing a dynamic programming algorithm by traversing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 8: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

Algorithm 1 Initial Deployment Heuristic Algorithm1: procedure INITIALDEPLOYMENT(Dataflow D)

. Alternate Selection Stage2: for PE P ∈ D do3: for Alternate A ∈ P do4: cAP ← GETCOSTOFALTERNATE(A)5: γ ← A.V alue6: if γ/cAP ≥ best then7: best← γ/cAP8: selected← A9: end if

10: end for11: end for

. Resource Allocation Stage12: while Ω ≤ Ω do13: if (VM.isAvailable = false) then14: VM ← INITIALIZEVM(LargestVMClass)15: end if16: P ← GETNEXTPE17: CurrentAlloc← ALLOCATENEWCORE(P, V M)18: Ω← GETESTIMATEDTHRUPUT(D,CurrentAlloc)19: end while20: for PE P ∈ D do21: if ISOVERPROVISIONED(P) then22: REPACKPEINSTANCES(P) . Move PE instances to a VM

with lower core capacity23: end if24: end for25: REPACKFREEVMS . Repack PEs in VMs with free cores to VMs

with less number of cores26: end procedure

the dataflow graph in reverse BFS order rooted at theoutput PEs.

This is followed by a resource selection stage (lines 12–25) which operates similar to the variable-sized bin packing(VBP) problem [30]. For the initial deployment, in theabsence of running VMs, we assume that each VMfrom a resource class behaves ideally as per its ratedperformance. The algorithm picks PEs (objects) in anorder given by GETNEXTPE, and puts an instance ofeach PE in the largest VM (bin) (line 17), creating a newVM (bin) if required. It then calculates the estimatedrelative throughput for the application given the cur-rent allocation (line 18) using Eqn. 7 and repeats theprocedure if the application constraint is not met. Itshould be noted that the GETESTIMATEDTHROUGHPUTfunction considers both the allocated cores and the avail-able bandwidth to calculate the relative throughput andhence scales out when either becomes a bottleneck.

The intuition behind GETNEXTPE is to choose PEsin an order that not only increases VM utilizationbut also limits the message transfer latency betweenthe PEs by collocating neighboring PEs in the dataflowwithin the same VM. We order the PEs using a forwardDFS traversal, rooted at the input PEs, and allocateresources to them in that order so as to increase theprobability of collocating neighboring PEs. It should benoted that the CPU cores required for the individualPEs are not known in advance as the resource require-ments depend on the current load which in turn de-pends on the resource requirements of the preceding PE.Hence, after assigning at least one CPU core to each PE(INCREMENTALLOCATION), the deployment algorithmchooses PEs in the order of largest bottlenecks in thedataflow, i.e., lowest relative PE throughput (Ωi). This

TABLE 1: Functions used in Initial Deployment Strategies

Function Sharded (SH) Centralized (CE)

GETCOSTOF-ALTERNATE

A.cost A.cost + Si ×∑successor.cost

GETNEXTPEif All PEs assigned then

return argminPj∈P

(Ωjt )

elsereturn Next PE in DFS

end if

REPACKPEINSTANCES N/A Move PE instance tosmallest VM big enoughfor required core-secs

REPACKFREEVMS N/A Iterative Repacking [30]

ensures that PEs needing more resources are chosen firstfor allocation. This in turn may increase the input rate(and processing load) on the successive PEs, makingthem the bottlenecks. As a result, we end up with aniterative approach to incrementally allocate CPU coresto PEs using the VBP heuristic until the throughputconstraint is met. Since the resource allocation onlyimpacts downstream PEs, this algorithm is bound toconverge. We leave a theoretical proof to future work.

At the end, the algorithm performs two levels ofrepacking. After a solution is obtained using VMs fromjust the largest resource class, we first move one in-stance for all the over-provisioned PEs to the smallestresource class large enough to accommodate that PE in-stance (best fit, using REPACKPEINSTANCES). This mayfree up capacity on the VMs, and hence, we again useiterative repacking [30] (REPACKVMS) to repack all theVMs with spare capacity to minimize wasted cores. Dur-ing this process, we may sacrifice instance collocation infavor of reduced resource cost. Our evaluation howevershows that this is an acceptable trade-off toward maxi-mizing the objective function. Note that these algorithmsare all performed off-line, and the actual deployment iscarried out only after these decision are finalized.

Both, the order in which PEs are chosen and therepacking strategy affects the quality of the heuristic.While the sharded strategy SH uses a local approach anddoes not perform any repacking, the centralized strategyCE repacks individual PEs and VMs, as shown in Table 1.

8.2 Runtime Adaptation Heuristic

The runtime adaptation kicks in periodically over thelifetime of the application execution. Alg. 2 considersthe current state of the dataflow and cloud resources –available through monitoring – in adapting the alternateand resource selection. The monitoring gives a moreaccurate estimate of data rates, and hence the resourcerequirements and its cost.

As before, the algorithm is divided into two stages:Alternate selection and Resource allocation. However,unlike the deployment heuristic, we do not run boththe stages at the same time interval. Instead, the alter-nates are selected every m intervals and the resourcesreallocated every n intervals. The former tries to switchalternates to achieve the throughput constraint given

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 9: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

the existing allocated resources, while the latter triesto balance the resources (i.e. provision new VMs orshutdown existing ones) given the alternates that areactive at that time. Separating these stages serves twogoals. First, it makes the algorithm for each stage moredeterministic and faster since one of the parameters isfixed. Second, it reduces the number of retractions ofdeployment decisions occurring in the system. For e.g.,if a decision to add a new VM leads to over-provisioningat a later time (but before the hourly boundary), insteadof shutting down the VM, the alternate selection stagecan potentially switch to an alternate with higher value,thus utilizing the extra resources available and in theprocess increase the application’s value.

During the alternate selection stage, given the currentdata rate and resource performance, we first calculatethe resources needed for each PE alternate (line 6). Wethen create a list of “feasible” alternates for a givenPE, based on whether the current relative throughputis lesser or greater than the expected throughput Ω.Finally, we sort the feasible alternates in decreasing orderof the ratio between value to cost, and select the firstalternate which can be accommodated using the existingresource allocation. After this phase the overall valueeither increases or decreases depending on whether theapplication was over-provisioned or under-provisionedto begin with, respectively.

The RESOURCEREDEPLOY procedure is used to allo-cate or de-allocate resources to maintain the requiredrelative throughput while minimizing the overall cost.If the Ω ≤ Ω − ε, the algorithm proceeds similar to theinitial deployment algorithm. It incrementally allocatesadditional resources to the bottlenecks observed in thesystem and repacks the VMs. However, if Ω > Ω + ε, thesystem must scale in to avoid resource wastage and hastwo decisions to make: first, which PE needs to be scaledin and second, which instance of the PE is to be removed,thus freeing the CPU cores. The over-provisioned PEselected for scale in is the one with the maximumrelative throughput (Ω). Once the over-provisioned PEis determined, to determine which instance of that PEshould be terminated, we get the list of VMs on whichthese instances are running and then weigh these VMsusing the following “weight” function (eqn 10). Finally,a PE instance which is executing on the least weightedVM is selected for removal.

VM Weight(ri) = Tc(ri)×(FreeCores(ri)TotalCores(ri)

)×(1− ϕri

ϕ)×

(TotalCores(ri)×πCost Per VM Hour

) (10)

where Tc is the time remaining in the current costcycle (i.e. time till the next hourly boundary), ϕri is thenumber of PE instances for the over-provisioned PE onthe VM ri, and ϕ is the total number of instances for thatPE across all VMs. The VM Weight is lower for VMs withless time left in their hourly cycle, and thus preferred forremoval. This increases temporal utilization. Similarly,VMs with fewer cores used are prioritized for removal.Further, VMs with higher cost per normalized core havea lower weight so that they are selected first for shut-down. Hence the VM Weight metric helps us pick the

Algorithm 2 Runtime Adaptation Heuristic Algorithm1: procedure ALTERNATEREDEPLOY(DataflowD,Ωt) . Ωt is the

observed relative throughput2: for PE P ∈ D do . Alternate selection phase3: alloc← CURRALLOCATEDRES(P) . Gets the current allocated

resources (accounting for Infra. variability)4: requiredC ← REQUIREDRES(P.activeAlternate) . Gets the

required resources for the selected alternate5: for AlternateA ∈ P do6: requiredA← ACTUALRESREQUIREMENTS(A)7: if Ωt ≤ Ω− ε then8: if requiredA ≤ requiredC then . Select alternate

with lower requirements9: feasible.ADD(A)

10: end if11: else if Ωt ≥ Ω + ε then12: if requiredA ≥ requiredC then . Select alternate

with higher requirements13: feasible.ADD(A)14: end if15: end if16: end for17: SORT(feasible) . Decreasing order of value/cost18: for feasible alternate A do19: if requiredA < alloc then20: SWITCHALTERNATE(A)21: done22: end if23: end for24: end for25: end procedure1: procedure RESOURCEREDEPLOY(DataflowD,Ωt)2: if Ωt ≤ Ω− ε then3: Same procedure as initial deployment4: else if Ωt ≥ Ω + ε then5: while Ω ≥ Ω do6: PE ← overProvisionedPE7: instance← SELECTINSTANCETOKILL(PE)8: newAlloc← REMOVEPEINSTANCE(instance)9: Ω← GETESTIMATEDTHRUPUT(D,newAlloc)

10: end while11: repackFreeVMs() . Repack PEs in the VMs with free cores

onto smaller VMs with collocation12: end if13: end procedure

PE instances in a manner that can help free up costlier,under-utilized VMs that can be shutdown at the end oftheir cost cycle to effectively reduce the resource cost.

9 EVALUATION

We evaluate the proposed heuristics through a simula-tion study. To emulate real-world cloud characteristics,we extend CloudSim [31] simulator to IaaSSim, that in-corporates temporal and spatial performance variabilityusing VM and network performance traces collectedfrom IaaS Cloud VMs 2. Further, FloeSim simulates theexecution of the dynamic dataflows [21], on top of IaaS-Sim, with support for dataflows, alternates, distributeddeployment, runtime scaling, and plugins for differentschedulers. To simulate data rate variations, the givendata rate is considered as an average value and theinstantaneous data rate is obtained using a randomwalk between ±50% of that value. However, to enablecomparisons between different simulation runs, we gen-erate this data trace once and use the same across allsimulation runs.

2. IaaSSim and performance traces are available athttp://github.com/usc-cloud/IaaSSimulator

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 10: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

Congestion Estimation#

Travel Time Estimator#

Weather updates

Accident Detector

Toll Calculator

Toll Assessment

Balance Queries

ReportParser

𝒫0

𝒫1

𝒫2

𝒫3

𝒫4

𝒫5

𝒫6

𝒫7

𝑝20: Υ2

0 = 1.0, 𝑐20 = 1.2, 𝑠2

0 = 1.0

𝑝21: Υ2

1 = 0.8, 𝑐21 = 0.8, 𝑠2

1 = 1.0

𝑝22: Υ2

2 = 0.5, 𝑐22 = 0.8, 𝑠2

2 = 0.5

𝑝70: Υ2

2 = 1, 𝑐22

𝑝71: Υ2

2 = 1, 𝑐22

Υ 𝑐 𝑠

𝑝20 1 1.2 1

𝑝21 0.8 0.8 1

𝑝22 0.5 0.8 0.5

Υ 𝑐 𝑠

𝑝70 0.6 0.3 1

𝑝71 1 0.5 1

Fig. 5: Dynamic Continuous Dataflow for LRB. Alternates forP2 and P7 have value (γ), cost (c) and selectivity (s).

For each experiment, we deploy the Linear RoadBenchmark (LRB) [16] as a dynamic dataflow usingFloeSim, run it for 12 simulated hours (T = 12 hrs)on simulated VMs whose performance traces were ob-tained from real Amazon EC2 VMs. Each experimentis repeated at least three times and average values arereported. We use a timestep duration of t = 5 mins atthe start of which adaptation decisions are made.

9.1 Linear Road Benchmark (LRB)The Linear Road Benchmark [16] is used to compare theperformance of data stream management systems andhas been adopted to general purpose stream processingsystems [32]. Using this as a base, we develop a dy-namic continuous dataflow application to evaluate theproposed scheduling heuristics. LRB models a road tollnetwork within a confined area (e.g., 100 sq. miles), inwhich the toll depends on several factors including timeof the day, current traffic congestion levels and proximityto accidents. It continuously ingests “position reports”from different vehicles on the road and is responsiblefor (i) detecting average speed and traffic congestionfor a section, (ii) detecting accidents, (iii) providing tollnotifications to vehicles whenever they enter a newsection, (iv) answering account balance queries and tollassessments, and (v) estimating travel times betweentwo sections. The goal is to support the highest numberof expressways while satisfying the desired latency con-straints. To simulate realistic road conditions, the datarate varies from around 10 msgs/sec to around 2,000msgs/sec per expressway.

Fig. 5 shows the LRB benchmark implemented asa dynamic continuous dataflow. The Weather Updates(P0) and Report Parse (P1) PEs act as the input PEs forthe dataflow. While the former receives low frequencyweather updates, the latter receives extremely high fre-quency position reports from individual vehicles (eachcar sends a position report every 30 secs) and exhibitsvariable data rates based on the current traffic condi-tions. The Congestion Estimation PE (P2) estimates currentas well as near-future traffic conditions for differentsections of all the monitored expressways. This PE mayhave several alternates using different machine learningtechniques that predict traffic with different accuracyand future horizons. We simulate three alternates withdifferent value (γ), cost (c) and selectivity (s) valuesas shown in the tables in Fig. 5. The Accident DetectorPE (P3) detects accidents based on the position reports,which is forwarded to Toll Calculator (P4) and Travel Time

Estimator (P7) PEs. The former notifies toll values (P5)and account balances (P6) to the vehicles periodically.The latter (P7) provides travel time estimates, and hasseveral alternates based on different forecasting models.For simulations, we use two alternates, e.g., (1) deci-sion/regression tree model which takes several historicalfactors into account, and (2) time series models whichpredict using only the recent past traffic conditions.

9.2 ResultsWe compare the proposed centralized and shardedheuristics (CE and SH) and the GA algorithm witha brute force approach (BR) that explores the searchtree but uses intelligent pruning to avoid searchingsub-optimal or redundant sub-trees. We evaluate theiroverall profit achieved, overall relative throughput andthe monetary cost of execution over the optimizationperiod of T = 12 hrs. An algorithm is better thananother if it meets the necessary relative applicationthroughput constraint, Ω ≥ Ω − ε, and has a highervalue for the objective function Θ (Eqn. 8) Note thatthe necessary constraint for Ω must be met but highervalues beyond the constraint do not indicate a betteralgorithm. Similarly, an algorithm with a higher Θ valueis not better unless it also meets the Ω constraint.

For all the experiments, we define the relative through-put threshold as Ω = 0.8 with a tolerance of ε = 0.05. Wecalculate σ for the LRB dataflow using Eqn. 9 by settingCΓminmax = 0.5×T×DataRate

10 and CΓmaxmax = 1.0×T×DataRate

10 .We empirically arrive at these bounds by observing theactual break-even cost for executing the workflow usinga brute force static deployment model.

1) Effect of Variability: Table 2 shows the overall profitand the relative throughput for a static deploymentof LRB using different scheduling algorithms with anaverage input data rate of 50 msgs/sec. The overall profitwhich is a function of application value and resource costremains constant due to a static deployment (withoutruntime adaptation). However, the relative throughputvaries as we introduce infrastructure and data variability.In the absence of any variability, the brute force (BR)approach gives the best overall profit and also meets thethroughput constraint (Ω ≥ 0.8). Further, GA approachesa near optimal solution with ΘGA → ΘBR when neitherinfrastructure nor input data rates vary, and is within thetolerance limit (Ω − ε < Ω = 0.79 < Ω + ε). The SH andCE heuristics meet the throughput constraint but give alower profit when there is no variability.

However, when running simulations with infrastruc-ture and/or data variability, none of the approachesTABLE 2: Effect of variability of infrastructure performance andinput data rate on relative output throughput using differentscheduling algorithms. Static LRB deployment with average of50 msgs/sec input rate, Ω = 0.8.

Relative Application Throughput (Ω)Algo. Profit (Θ) Neither Infra. Perf. Data Rate BothBR 0.67 0.80 0.68 0.59 0.44GA 0.65 0.79 0.67 0.48 0.37SH 0.45 0.81 0.60 0.40 0.29CE 0.58 0.81 0.66 0.42 0.31

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 11: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

0

0.2

0.4

0.6

0.8

1

10 20 50 100 500 1000 2000

Rel

ativ

e A

pp

. Th

rou

ghp

ut(Ω

)

Avg. Data Rates (msg/sec)

BR - No Var. BR - Both Var.

GA - No Var. GA - Both Var. Ω = 0.8

(a) Ω with Static Deployment

0

0.2

0.4

0.6

0.8

1

10 20 50 100 500 1000 2000

Rel

ativ

e A

pp

. Th

rou

ghp

ut

(Ω)

Avg. Data Rates (msg/sec)

SH CE GA

Ω = 0.8

(b) Ω with Runtime Adaptation

-0.2

-0.1

0

0.1

0.2

0.3

0.4

0.5

10 20 50 100 500 1000 2000

Pro

fit

(θ)

Avg. Data Rates (msg/sec)

SH CE GA

(c) Θ with Runtime Adaptation

Fig. 6: Effect of infrastructure and data rate variability on Static Deployment and Runtime Adaptation, as input data rate rises.

meet the desired throughput constraints with Ω valuesranging between 0.29 to 0.68, which is much less than thegoal of Ω = 0.8. Since these experiments do not changethe deployment at runtime, the initial deployment isbased on assuming constant data rates and infrastructureperformance. Even the two best static deployment strate-gies in the absence of variability, BR and GA, rapidlydegrade in output throughput when both infrastructureand data variability are present. This is more so asthe average input data rate increases from 10 msgs/secto 2,000 msgs/sec in Fig. 6(a). Due to explosion instate space, we could not run the BR algorithm beyond50 msgs/sec input rate. This analysis motivates the needfor autonomic adaptations to application deployment.

2) Improvements with Runtime Adaptation:Figs. 6(b) and 6(c) show the improvements in relativethroughput and overall application profit by utilizingdifferent runtime adaptation techniques in the presenceof both infrastructure as well as data variability. Weuse GA as the upper bound (instead of BR) since BRis prohibitively slow for runtime adaptations and, asshown in Table 2, GA approaches the optimal in manyscenarios. However, as discussed later (Fig. 7(a)), evenGA becomes prohibitive for data rates ≥ 500 msg/sec,and hence the missing entries in Figs. 6(b) and 6(c).

We observe that with dynamic adaptation both GAand the greedy heuristics (SH and CE) achieve thedesired throughput constraint (Ω ≥ Ω = 0.8) for allthe input data rates tested. This allows us to comparetheir achieved application profit (Fig. 6(c)) and makeseveral key observations. First, profit from GA is con-sistently more than the SH and CE greedy heuristics.Second, CE achieves a better profit than SH and reachesbetween 60% to 80% of GA’s profit. In fact, SH givesnegative profit (loss) in some cases. Understandably, theCE scheduler having a global view performs signifi-cantly better than SH that performs local optimizations.However, CE has a higher overhead due to centralizedcollection of monitoring data (the exact overhead is notavailable from simulations). Lastly, comparing the staticand dynamic deployment from Table 2 and Fig. 6(c), fora data rate of 50 msg/sec under variable conditions, wedo see a drop in profit using runtime adaptation as ittries to achieve the Ω constraint. For GA and CE, theprofits for adaptive deployment drop from 0.65 → 0.34and 0.58 → 0.29, respectively, but are still well abovethe break-even point of 0. But the static deploymentsviolate the throughput constraint by a large margin

which makes their higher profits meaningless.3) Scalability of Algorithms: Figs. 7(a) and 7(b) show

algorithm scalability with respect to algorithm runtimeand the number of cores for the initial deploymentalgorithm with increase in the incoming data rates.While the BR and the GA algorithms provide (near)optimal solutions for smaller data rates, their overheadis prohibitively large for higher data rates (fig. 7(a))(> 10, 000 secs for BR at 50 msg/sec, and > 1, 000 secsfor GA at 8,000 msg/sec). This is due to the search spaceexplosion with the increase in the number of requiredVMs as shown in fig 7(b). On the other hand, both CEand SH greedy heuristics take just ∼ 2.5 secs to computeat 8,000 msg/sec, and scale linearly (O(|ϕ| + |R|)) withthe number of PE instances (|ϕ|) and number of virtualmachines(|R|). Further we see that SH algorithm leadsto higher resource wastage (more cores) with increase inthe data rates, while CE and GA show a linear relationto the data rates in Fig. 7(b). Similar results are seen forthe adaptation stage for SH and CE algorithms but theplots are omitted due to space constraints.

4) Benefit of using Alternates: We study the reductionin monetary cost to run the continuous dataflow due tothe use of alternates, as opposed to a dataflow deployedwith only a single implementation for the PEs; we choosethe implementation with the highest value (Γ = 1).Fig. 7(c) shows the cost (US$) of resources required toexecute the LRB dataflow for the optimization intervalT = 12 hr using the greedy heuristics with runtimeadaptation, in the presence of both infrastructure anddata variability. We use AWS’s EC2 prices using m1.*generation of VMsfor calculating the monetary cost.

We see that the use of alternates by runtime adaptationleads to a reduction in total monetary cost by 6.9% to27.5%, relative to the non-alternate dataflow; alternateswith different processing requirements provide an extradimension of control to the scheduler. In addition, thebenefit of alternates increases with high data rate – asthe input data rate and hence the resource requirementincreases, even small fluctuations in data rate or infras-tructure performance causes new VMs to be acquiredto meet the Ω constraint, and acquiring VMs has ahigher overhead (e.g., the hourly cost cycle and startupoverheads) than switching between alternates.

10 RELATED WORKScientific workflows [33], continuous dataflow sys-tems [7], [8], [5], [9] and similar large-scale distributed

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 12: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

0.001

0.1

10

1000

100000

10 100 1000 10000

Tim

e (s

ec)

log

scal

e

Avg. Data Rates (msg/sec) log scale

BR GA CE SH

(a) Algorithm time as a function of datarate.

0

1000

2000

3000

4000

5000

6000

0 2000 4000 6000 8000

# co

res

Avg. Data Rates (msg/sec)

BR GA CE SH

(b) # Cores as a function of datarate

0

1000

2000

3000

4000

10 20 50 100 500 1000 2000

Mo

net

ary

Co

st (

$)

Avg. Data Rates (msg/sec)

CE With Alternates

CE No Alternates

SH With Alternates

SH No Alternates

(c) Monetary cost benefit of using alternates

Fig. 7: Algorithm scalability (a,b) and advantage of alternates (c)

programming frameworks [34], [35] have garnered arenewed research focus due to the recent explosion inthe amount of data, both archived and real time, andthe need for large-scale data analysis on this “Big Data”.Our work is based upon the stream processing and con-tinuous dataflow systems that allow a task-graph basedprogramming model to execute long running continu-ous applications which process incoming data streamsin near-real time. Other related work includes flexibleworkflows and heterogeneous computing, service ori-ented architecture (SOA) and autonomic provisioningand resource management in clouds. We discuss thestate-of-the-art in each of these research areas.

Continuous dataflow systems have their root in DataStream Management Systems, which process continuousqueries over tuple streams composed of of well-definedoperators [36]. This allows operator-specific query op-timizations such as operator split and merge to beperformed [37]. General-purpose continuous dataflowsystems such as S4 [7], Storm [8], and Spark [34], onthe other hand, allow user-defined processing elements,making it necessary to find generic auto-scaling solu-tions such as operator scaling and data-parallel oper-ations [38]. Several solutions, including one leverag-ing cloud elasticity to support auto-scaling, have beenproposed [11]. However, these systems [10], [39], onlyconsider data variability as a factor for auto-scalingdecisions and assume that the underlying infrastructureoffers the same performance over time. Our work showsthat this assumption does not hold in virtualized clouds.

Autonomic provisioning for workload resource man-agement on clouds have been proposed. These use per-formance monitoring and model-based approach [40],[41]. We use a similar approach and propose heuristicsfor dynamic continuous dataflows that handle not onlydata rate variations but also changes in the underlyinginfrastructure performance. Recent work [32] integrateselastic scale out and fault tolerance for stateful streamprocessing but adopts a local only policy based on CPUutilization for scaling. In this article, we assume statelessPEs, and fault tolerance is beyond the scope of this work.Our results do show that using local scale-out strategiesthat ignore the dataflow structure under-perform, andhence motivates heuristics with a global view.

Flexible workflows [42], [19] and service selection inSOA [43] allow workflow compositions to be trans-formed at runtime. This provides a powerful composi-tional tool to the developer to define business-rule based

generic workflows that can be specialized at runtime de-pending on the environmental characteristics. The notionof “alternates” we propose is similar in that it offersflexibility to the developer and a choice of executionimplementations at runtime. However, unlike flexibleworkflows where the decision about task specializationis made exactly once based on certain deterministic pa-rameters, in continuous dataflows, this decision has to bere-evaluated regularly due to their continuous executionmodel and dynamic nature of the data streams.

To exploit a heterogeneous computing environment,an application task may be composed of several sub-tasks that have different requirements and performancecharacteristics. Various dynamic and static task matchingand scheduling techniques have been proposed for suchscenarios [18], [44], [45]. The concept of alternates indynamic dataflow is similar to these. However, currently,we do not allow heterogeneous computing requirementsfor these alternates, though they may vary in processingrequirements. Even with this restriction, the concept ofalternates provides a powerful programming abstractionthat allows us to switch between them at runtime tomaximize the overall utility of the system in responseto changing data rates or infrastructure performance.

Several studies have compared the performance of thevirtualized environment against the barebones hardwareto show their average performances are within an accept-able tolerance limit of each other. However these studiesfocused on the average performance characteristics andnot on the variations in performance. Recent analysisof public cloud infrastructure [12], [46], [47], [14], [48]demonstrate high fluctuations in various cloud services,including cloud storage, VM startup and shutdown timeas well as virtual machines core performance and virtualnetworking. However, the degree of performance fluc-tuations vary across private and different public cloudproviders [13]. Reasons for this include multi-tenancyof VMs on the same physical host, use of commodityhardware, collocation of faults, and roll out of softwarepatches to the cloud fabric. Our own studies confirmthese. On this basis, we develop an abstraction of theIaaS cloud that incorporates infrastructure variabilityand also include it in our IaaS Simulator.

Meta-heuristics have been widely used to address thetask scheduling problem [24], [25], [26], [27]. Most ofthe approaches are nature inspired and rely on GA [28],ant colony optimization [49], particle swarm optimiza-tion [50] or simulated annealing [51] techniques to search

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 13: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

for sub-optimal solutions. A number of studies to ana-lyze the efficiency of meta-heuristics [24] show that incertain scenarios GAs can over perform greedy heuris-tics. More generally, Zamfirache et. al. [27] show thatpopulation based GA meta-heuristics of classical greedyapproaches provide, through mutations, solutions thatare better compared to their classic versions. Recently,a comparison of ant colony optimization and particleswarm optimization with a GA for scheduling DAGs onclouds was proposed [26]. None of these task schedulingalgorithms or meta-heuristics based solutions take intoaccount a dynamic list of task instances. Our own priorwork [25] uses a GA to elastically adapt the number oftask instances in a workflow to incoming web traffic butdoes not consider alternates or performance variability.

11 CONCLUSION

In this article, we have motivated the need for onlinemonitoring and adaptation of continuous dataflow ap-plications to meet their QoS constraints in the presenceof data and infrastructure variability. To this end weintroduce the notion of dynamic dataflows, with supportfor alternate implementations for dataflow tasks. Thisnot only gives users the flexibility in terms of applicationcomposition, but also provides an additional dimensionof control for the scheduler to meet the applicationconstraints while maximizing its value.

Our experimental results show that the continuousadaptation heuristics which makes use of applicationdynamism can reduce the execution cost by up to 27.5%on clouds while also meeting the QoS constraints. Wehave also studied the feasibility of GA based approachfor optimizing execution of dynamic dataflows andshow that although the GA based approach gives near-optimal solutions its time complexity is proportionalto the input data rate, making it unsuitable for highvelocity applications. A hybrid approach which uses GAfor initial deployment and the CE greedy heuristic forruntime adaptation may be more suitable. This is to beinvestigated as future work.

In addition, we plan to extend the concept of dynamictasks which will further allow for alternate implementa-tions at coarser granularity such as “alternate paths”,and provide end users with more sophisticated con-trols. Further, we plan to extend the resource mappingheuristics for an ensemble of dataflows with a sharedbudget and address issues such as fairness in additionto throughput constraint and application value.

ACKNOWLEDGMENTS

This material is based on research sponsored by DARPA,and the Air Force Research Laboratory under agreementnumber FA8750-12-2-0319. The views and conclusionscontained herein are those of the authors and shouldnot be interpreted as necessarily representing the officialpolicies or endorsements, either expressed or implied, ofDARPA, and the Air Force Research Laboratory or theU.S. Government.

REFERENCES

[1] L. Douglas, “3d data management: Controlling data volume,velocity, and variety,” Gartner, Tech. Rep., 2001.

[2] J. Dean and S. Ghemawat, “Mapreduce: simplified data process-ing on large clusters,” in Communications. ACM, 2008, vol. 51,no. 1, pp. 107–113.

[3] M. Gatti, P. Cavalin, S. B. Neto, C. Pinhanez, C. dos Santos, D. Gri-bel, and A. P. Appel, “Large-scale multi-agent-based modelingand simulation of microblogging-based online social network,”in Multi-Agent-Based Simulation XIV. Springer, 2014, pp. 17–33.

[4] I. Fronza, A. Sillitti, G. Succi, M. Terho, and J. Vlasenko, “Failureprediction based on log files using random indexing and supportvector machines,” in Journal of Systems and Software. Elsevier,2013, vol. 86, no. 1, pp. 2–11.

[5] A. Biem, E. Bouillet, H. Feng, A. Ranganathan, A. Riabov,O. Verscheure, H. Koutsopoulos, and C. Moran, “Ibm infospherestreams for scalable, real-time, intelligent transportation services,”in International Conference on Management of data. ACM SIGMOD,2010, pp. 1093–1104.

[6] Y. Simmhan, S. Aman, A. Kumbhare, R. Liu, S. Stevens, Q. Zhou,and V. Prasanna, “Cloud-based software platform for big dataanalytics in smart grids,” in Computing in Science Engineering, July2013, vol. 15, no. 4, pp. 38–47.

[7] L. Neumeyer, B. Robbins, A. Nair, and A. Kesari, “S4: Distributedstream computing platform,” in IEEE International Conference onData Mining Workshops (ICDMW), 2010.

[8] “Storm: Distributed and fault-tolerant realtime computation,”http://storm.incubator.apache.org/, accessed: 2014-06-30.

[9] M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica, “Discretizedstreams: an efficient and fault-tolerant model for stream process-ing on large clusters,” in USENIX conference on Hot Topics in CloudCcomputing, 2012, pp. 10–10.

[10] B. Satzger, W. Hummer, P. Leitner, and S. Dustdar, “Esc: Towardsan elastic stream computing platform for the cloud,” in IEEEInternational Conference on Cloud Computing (CLOUD), July 2011,pp. 348–355.

[11] V. Gulisano, R. Jimenez-Peris, M. Patino-Martinez, C. Soriente,and P. Valduriez, “Streamcloud: An elastic and scalable datastreaming system,” in Transactions on Parallel and Distributed Sys-tems. IEEE, 2012, vol. 23, no. 12, pp. 2351–2365.

[12] A. Iosup, N. Yigitbasi, and D. Epema, “On the performance vari-ability of production cloud services,” in IEEE/ACM InternationalSymposium on Cluster, Cloud and Grid Computing (CCGrid), 2011.

[13] A. Li, X. Yang, S. Kandula, and M. Zhang, “Cloudcmp: comparingpublic cloud providers,” in conference on Internet measurement.ACM SIGCOMM, 2010, pp. 1–14.

[14] I. Moreno, P. Garraghan, P. Townend, and J. Xu, “Analysis,modeling and simulation of workload patterns in a large-scaleutility cloud,” in Transactions on Cloud Computing. IEEE, April2014, vol. 2, no. 2, pp. 208–221.

[15] A. Kumbhare, Y. Simmhan, and V. K. Prasanna, “Exploiting appli-cation dynamism and cloud elasticity for continuous dataflows,”in International Conference for High Performance Computing, Network-ing, Storage and Analysis. ACM, 2013, p. 57.

[16] A. Arasu, M. Cherniack, E. Galvez, D. Maier, A. S. Maskey,E. Ryvkina, M. Stonebraker, and R. Tibbetts, “Linear road: astream data management benchmark,” in international conferenceon Very Large Databases. VLDB Endowment, 2004, pp. 480–491.

[17] W. M. van Der Aalst, A. H. Ter Hofstede, B. Kiepuszewski,and A. P. Barros, “Workflow patterns,” in Distributed and paralleldatabases. Springer, 2003, vol. 14, no. 1, pp. 5–51.

[18] M. Maheswaran, S. Ali, H. J. Siegel, D. Hensgen, and R. F. Freund,“Dynamic mapping of a class of independent tasks onto hetero-geneous computing systems,” in Journal of Parallel and distributedComputing. Elsevier, 1999, vol. 59, no. 2, pp. 107–131.

[19] J. Wainer and F. de Lima Bezerra, “Constraint-based flexible work-flows,” in Groupware: Design, Implementation, and Use. Springer,2003, pp. 151–158.

[20] C. Ouyang, E. Verbeek, W. M. Van Der Aalst, S. Breutel, M. Du-mas, and A. H. Ter Hofstede, “Formal semantics and analysisof control flow in ws-bpel,” in Science of Computer Programming.Elsevier, 2007, vol. 67, no. 2, pp. 162–198.

[21] Y. Simmhan and A. G. Kumbhare, “Floe: A continuousdataflow framework for dynamic cloud applications,” CoRR, vol.abs/1406.5977, 2014.

[22] M. C. Jaeger, G. Rojec-Goldmann, and G. Muhl, “Qos aggregationfor web service composition using workflow patterns,” in IEEE

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015

Page 14: Reactive Resource Provisioning Heuristics for Dynamic ... Resource... · uate the reactive heuristics through large-scale sim-ulations of LRB, scaling up to 8,000 msgs/sec, using

International conference on Enterprise distributed object computing.,2004, pp. 149–159.

[23] G. J. Woeginger, “Exact algorithms for np-hard problems: A sur-vey,” in Combinatorial Optimization–Eureka, You Shrink! Springer,2003, pp. 185–207.

[24] T. D. Braun, H. J. Siegel, N. Beck, L. L. Boloni, M. Maheswaran,A. I. Reuther, J. P. Robertson, M. D. Theys, B. Yao, D. Hensgen,and R. F. Freund, “A comparison of eleven static heuristics formapping a class of independent tasks onto heterogeneous dis-tributed computing systems,” in Journal of Parallel and DistributedComputing. Elsevier, Jun. 2001, vol. 61, no. 6, pp. 810–837.

[25] M. E. Frincu, “Scheduling highly available applications on cloudenvironments,” in Future Generation Computer Systems. Elsevier,2014, vol. 32, pp. 138–153.

[26] Z. Wu, X. Liu, Z. Ni, D. Yuan, and Y. Yang, “A market-orientedhierarchical scheduling strategy in cloud workflow systems,” inThe Journal of Supercomputing. Springer, 2013, vol. 63, no. 1, pp.256–293.

[27] F. Zamfirache, M. Frincu, and D. Zaharie, “Population-basedmetaheuristics for tasks scheduling in heterogeneous distributedsystems,” in Numerical Methods and Applications. Springer, 2011,vol. 6046, pp. 321–328.

[28] J. H. Holland, Adaptation in Natural and Artificial Systems. Cam-bridge, MA, USA: MIT Press, 1992.

[29] D. E. Goldberg and K. Deb, “A Comparative Analysis of SelectionSchemes Used in Genetic Algorithms,” in Foundations of GeneticAlgorithms, 1990, pp. 69–93.

[30] J. Kang and S. Park, “Algorithms for the variable sized binpacking problem,” in European Journal of Operational Research.Elsevier, 2003, vol. 147, no. 2, pp. 365–372.

[31] R. N. Calheiros, R. Ranjan, A. Beloglazov, C. A. De Rose, andR. Buyya, “Cloudsim: a toolkit for modeling and simulation ofcloud computing environments and evaluation of resource pro-visioning algorithms,” in Software: Practice and Experience. WileyOnline Library, 2011, vol. 41, no. 1, pp. 23–50.

[32] R. Castro Fernandez, M. Migliavacca, E. Kalyvianaki, and P. Piet-zuch, “Integrating scale out and fault tolerance in stream process-ing using operator state management,” in International conferenceon Management of data. ACM SIGMOD, 2013, pp. 725–736.

[33] J. Yu and R. Buyya, “A taxonomy of scientific workflow systemsfor grid computing,” in Sigmod Record. ACM, 2005, vol. 34, no. 3,p. 44.

[34] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Sto-ica, “Spark: cluster computing with working sets,” in USENIXconference on Hot topics in cloud computing, 2010.

[35] S. Pallickara, J. Ekanayake, and G. Fox, “Granules: A lightweight,streaming runtime for cloud computing with support, for map-reduce,” in IEEE International Conference on Cluster Computing.IEEE, 2009.

[36] S. Babu and J. Widom, “Continuous queries over data streams,”in Sigmod Record. ACM, 2001, vol. 30, no. 3, pp. 109–120.

[37] M. A. Shah, J. M. Hellerstein, S. Chandrasekaran, and M. J.Franklin, “Flux: An adaptive partitioning operator for continuousquery systems,” in International Conference on Data Engineering.IEEE, 2003, pp. 25–36.

[38] Y. Zhou, B. C. Ooi, K.-L. Tan, and J. Wu, “Efficient dynamicoperator placement in a locally distributed continuous querysystem,” in On the Move to Meaningful Internet Systems: CoopIS,DOA, GADA, and ODBASE. Springer, 2006, pp. 54–71.

[39] R. Tolosana-Calasanz, J. Angel Banares, C. Pham, and O. Rana,“End-to-end qos on shared clouds for highly dynamic, large-scalesensing data streams,” in IEEE/ACM International Symposium onCluster, Cloud and Grid Computing. IEEE, 2012, pp. 904–911.

[40] A. Quiroz, H. Kim, M. Parashar, N. Gnanasambandam, andN. Sharma, “Towards autonomic workload provisioning for en-terprise grids and clouds,” in IEEE/ACM International Conferenceon Grid Computing, Oct 2009, pp. 50–57.

[41] Y. Hu, J. Wong, G. Iszlai, and M. Litoiu, “Resource provisioningfor cloud computing,” in Conference of the Center for AdvancedStudies on Collaborative Research. Riverton, NJ, USA: IBM Corp.,2009, pp. 101–111.

[42] G. J. Nutt, “The evolution towards flexible workflow systems,”in Distributed Systems Engineering. IOP Publishing, 1996, vol. 3,no. 4.

[43] D. Guinard, V. Trifa, S. Karnouskos, P. Spiess, and D. Savio,“Interacting with the soa-based internet of things: Discovery,query, selection, and on-demand provisioning of web services,”in Transactions on Services Computing. IEEE, 2010, vol. 3, no. 3,pp. 223–235.

[44] L. Wang, H. J. Siegel, V. P. Roychowdhury, and A. A. Maciejewski,“Task matching and scheduling in heterogeneous computing en-vironments using a genetic-algorithm-based approach,” in Journalof Parallel and Distributed Computing. Elsevier, 1997, vol. 47, no. 1,pp. 8–22.

[45] H. Topcuoglu, S. Hariri, and M.-y. Wu, “Performance-effective andlow-complexity task scheduling for heterogeneous computing,” inTransactions on Parallel and Distributed Systems. IEEE, 2002, vol. 13,no. 3, pp. 260–274.

[46] A. Iosup, S. Ostermann, M. N. Yigitbasi, R. Prodan, T. Fahringer,and D. H. Epema, “Performance analysis of cloud computingservices for many-tasks scientific computing,” in Transactions onParallel and Distributed Systems. IEEE, 2011, vol. 22, no. 6, pp.931–945.

[47] K. R. Jackson, L. Ramakrishnan, K. Muriki, S. Canon, S. Cholia,J. Shalf, H. J. Wasserman, and N. J. Wright, “Performance analysisof high performance computing applications on the amazonweb services cloud,” in IEEE International Conference on CloudComputing Technology and Science (CloudCom). IEEE, 2010, pp.159–168.

[48] Z. Ou, H. Zhuang, A. Lukyanenko, J. Nurminen, P. Hui,V. Mazalov, and A. Yla-Jaaski, “Is the same instance type createdequal? exploiting heterogeneity of public clouds,” in Transactionson Cloud Computing. IEEE, July 2013, vol. 1, no. 2, pp. 201–214.

[49] A. Colorni, M. Dorigo, and V. Maniezzo, “Distributed Optimiza-tion by Ant Colonies,” in European Conference on Artificial Life,1991, pp. 134–142.

[50] J. Kennedy and R. Eberhart, “Particle swarm optimization,” inIEEE International Conference on Neural Networks, vol. 4, 1995, pp.1942–1948 vol.4.

[51] N. Metropolis, A. W. Rosenbluth, M. N. Rosenbluth, A. H. Teller,and E. Teller, “Equation of State Calculations by Fast ComputingMachines,” in The Journal of Chemical Physics. AIP, 1953, vol. 21,no. 6, pp. 1087–1092.

Alok Gautam Kumbhare is a PhD candidate inComputer Science at the Univ. of Southern Cal-ifornia and a Research Assistant in the DARPAXDATA project. His research interests are in theareas of resource management and fault toler-ance for workflow and dataflows on distributedenvironment such as Clouds.

Yogesh Simmhan is an Assistant Professor atthe Indian Institute of Science. Previously, hewas a Research Assistant Professor in Electri-cal Engineering at the University of SouthernCalifornia and a postdoc at Microsoft Research.His research explores scalable abstractions, al-gorithms and applications for distributed plat-forms, and helps advance fundamental knowl-edge while offering a practitioner’s insight. Hehas a Ph.D. in Computer Science from IndianaUniversity. Senior member of ACM and IEEE.

Marc Frincu received his Ph.D. from West Uni-versity of Timisoara Romania in 2011. Duringhis Ph.D. he also worked as a junior researcherat the e-Austria Research Institute Romania. In2012 he joined University of Strasbourg Franceas a postdoctoral researcher focusing on cloudscheduling. In 2013 he took a postdoctoral re-search associate position at Univ of SouthernCalifornia dealing with cloud computing andsmart grid systems.

Viktor K. Prasanna is the Powell Chair in Engi-neering, Professor of Electrical Engineering andComputer Science, and Director of the Centerfor Energy Informatics at the Univ of SouthernCalifornia. His research interests include HPC,Reconfigurable Computing, and Embedded Sys-tems. He received his MS from the School ofAutomation, Indian Institute of Science and PhDin Computer Science from Penn State. He is aFellow of IEEE, ACM and AAAS.

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOLUME, 2015