granules: scaleable, distributed stream processing - 1...

16
1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput Processing of Data Streams Thilina Buddhika, Ryan Stern, Kira Lindburg, Kathleen Ericson, and Shrideep Pallickara, Members, IEEE Abstract—Data Streams occur naturally in several observational settings and often need to be processed with a low latency. Streams pose unique challenges: they have no preset lifetimes, the traffic on these streams may be bursty, and data arrival rates on these streams can be quite high. Furthermore, stream processing computations are generally stateful where the outcome of processing a data stream packet depends on the state that builds up within the computation over multiple, successive rounds of execution. As the number of streams increases, stream processing computations need to be orchestrated over a collection of machines. Achieving timeliness and high throughput in such settings is a challenge. Optimal scheduling of stream processing computations is an instance of the resource constrained scheduling problem, and depending on the precise formulation of the problem can be characterized as either NP-Complete or NP-Hard. We have designed an algorithm for online scheduling of stream processing computations. Our algorithm focuses on reducing interference that adversely impacts performance of stream processing computations. Our measure of interference is based on stream packet arrivals at a particular machine, the accompanying resource utilization encompassing CPU, memory and network utilization, and the resource utilization at machines comprising the cluster. Our algorithm performs continuous, incremental detection of interference experienced by computations and performing migrations to alleviate them. Index Terms—low-latency stream processing; online scheduling; data intensive computing 1 I NTRODUCTION F ALLING costs, improved network connectivity, and the ability to report measurements at increased precision have led to a proliferation of sensors. These devices generate data streams, a set of correlated packets, reporting their measurements. The nature of these streams poses unique processing challenges. Streams may have no preset lifetimes or data production rates and can be voluminous. Further- more, stream processing computations are long running since processing is often tied to the stream lifetimes. Com- putations also need to retain state and build context that is critical when processing individual stream packets. In this study, the computations that operate on these streams must take into account what has happened in the past. This means the computations are stateful, i.e., they build and retain state during execution, and the outcome of processing any stream packet depends on the built up state. Stateful computations are commonly used in several domains; for example, in health stream processing. In the case of health sensors, vital signs data are reported as physiological streams. These vital sign include blood pressure, thorax extensions, electroencephalograms (EEGs), electrocardiograms (ECGs or EKGs), and pulse oximetry data among others. Stateful computations in health stream processing allow performing trend analysis on the physi- ological streams prior to determining if an alert needs to be issued. In the case of ECG processing, a stateful com- putation that retains information about past, recurring, and systemic abnormalities in the ECG waveform may identify an impending health emergency. The per-packet processing overhead for stream process- ing may be in the order of a few milliseconds, but packets continually arrive at high rates and must be processed with a low latency. The devices may be configured to report measurements at increased frequencies. For example, in health stream processing settings, this is done to enhance patient surveillance to improve clinical deterioration de- tection capabilities. If not managed carefully, processing delays can become insidious, resulting in queue buildups at a processing node alongside subsequent buffer overflows, and exhausted memory conditions. Given the data volumes involved, processing must be orchestrated over a collection of machines. Scheduling processing tasks at scale over a collection of machines is a challenge. Scheduling a single stream process- ing task is easy; so is scheduling multiple tasks if there are an unlimited number of available machines. For the problem we consider, we have a limited number of machines and a large number of stream processing tasks, with each stream packet representing a unit of work that needs to be com- pleted. The number of tasks will significantly outnumber the number of machines available. This is an instance of the resource-constrained scheduling problem – given a set of tasks, a set of machines, and a performance measure, the objective is to assign tasks to machines such that the desired performance measure is maximized. Additional constraints may be specified on the performance measure, such as a bound on the completion time. Depending on the precise formulation of the problem, some have characterized this as NP-Complete and some as NP-Hard [1], [2], [3], [4]. There is no known algorithm for finding an optimal solution in poly- nomial time; when the problem size grows with increases in machines or tasks, or when additional constraints are imposed, finding the optimal solution is computationally intractable. Stream scheduling must be online, encompass-

Upload: others

Post on 28-Sep-2020

3 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

1

Online Scheduling and Interference Alleviationfor Low-latency, High-throughput Processing of

Data StreamsThilina Buddhika, Ryan Stern, Kira Lindburg, Kathleen Ericson, and Shrideep Pallickara, Members, IEEE

Abstract—Data Streams occur naturally in several observational settings and often need to be processed with a low latency. Streamspose unique challenges: they have no preset lifetimes, the traffic on these streams may be bursty, and data arrival rates on thesestreams can be quite high. Furthermore, stream processing computations are generally stateful where the outcome of processing adata stream packet depends on the state that builds up within the computation over multiple, successive rounds of execution. As thenumber of streams increases, stream processing computations need to be orchestrated over a collection of machines. Achievingtimeliness and high throughput in such settings is a challenge. Optimal scheduling of stream processing computations is an instance ofthe resource constrained scheduling problem, and depending on the precise formulation of the problem can be characterized as eitherNP-Complete or NP-Hard. We have designed an algorithm for online scheduling of stream processing computations. Our algorithmfocuses on reducing interference that adversely impacts performance of stream processing computations. Our measure of interferenceis based on stream packet arrivals at a particular machine, the accompanying resource utilization encompassing CPU, memory andnetwork utilization, and the resource utilization at machines comprising the cluster. Our algorithm performs continuous, incrementaldetection of interference experienced by computations and performing migrations to alleviate them.

Index Terms—low-latency stream processing; online scheduling; data intensive computing

F

1 INTRODUCTION

FALLING costs, improved network connectivity, and theability to report measurements at increased precision

have led to a proliferation of sensors. These devices generatedata streams, a set of correlated packets, reporting theirmeasurements. The nature of these streams poses uniqueprocessing challenges. Streams may have no preset lifetimesor data production rates and can be voluminous. Further-more, stream processing computations are long runningsince processing is often tied to the stream lifetimes. Com-putations also need to retain state and build context that iscritical when processing individual stream packets. In thisstudy, the computations that operate on these streams musttake into account what has happened in the past. This meansthe computations are stateful, i.e., they build and retain stateduring execution, and the outcome of processing any streampacket depends on the built up state.

Stateful computations are commonly used in severaldomains; for example, in health stream processing. Inthe case of health sensors, vital signs data are reportedas physiological streams. These vital sign include bloodpressure, thorax extensions, electroencephalograms (EEGs),electrocardiograms (ECGs or EKGs), and pulse oximetrydata among others. Stateful computations in health streamprocessing allow performing trend analysis on the physi-ological streams prior to determining if an alert needs tobe issued. In the case of ECG processing, a stateful com-putation that retains information about past, recurring, andsystemic abnormalities in the ECG waveform may identifyan impending health emergency.

The per-packet processing overhead for stream process-ing may be in the order of a few milliseconds, but packetscontinually arrive at high rates and must be processed with

a low latency. The devices may be configured to reportmeasurements at increased frequencies. For example, inhealth stream processing settings, this is done to enhancepatient surveillance to improve clinical deterioration de-tection capabilities. If not managed carefully, processingdelays can become insidious, resulting in queue buildups ata processing node alongside subsequent buffer overflows,and exhausted memory conditions. Given the data volumesinvolved, processing must be orchestrated over a collectionof machines.

Scheduling processing tasks at scale over a collection ofmachines is a challenge. Scheduling a single stream process-ing task is easy; so is scheduling multiple tasks if there arean unlimited number of available machines. For the problemwe consider, we have a limited number of machines and alarge number of stream processing tasks, with each streampacket representing a unit of work that needs to be com-pleted. The number of tasks will significantly outnumberthe number of machines available. This is an instance ofthe resource-constrained scheduling problem – given a set oftasks, a set of machines, and a performance measure, theobjective is to assign tasks to machines such that the desiredperformance measure is maximized. Additional constraintsmay be specified on the performance measure, such as abound on the completion time. Depending on the preciseformulation of the problem, some have characterized this asNP-Complete and some as NP-Hard [1], [2], [3], [4]. There isno known algorithm for finding an optimal solution in poly-nomial time; when the problem size grows with increasesin machines or tasks, or when additional constraints areimposed, finding the optimal solution is computationallyintractable. Stream scheduling must be online, encompass-

Page 2: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

2

ing continuous, incremental scheduling decisions that ac-count for changes in the stream packet arrivals and clusterresource utilizations.

Challenges: There are several challenges in accomplishinglow-latency, high-throughput processing of streams at scale.

1) Stateful computations: Stateless computations are easier toscale, since they can simply be replicated on multiple ma-chines with stream packets being processed in a round-robin fashion and in parallel. In stateful computations,the outcome depends on the built up state.

2) Stream computations outnumber machines: The number ofstream computations will be multiple orders of mag-nitude larger than the number of machines availablefor data processing, especially in Internet-of-Things set-tings such as smart cities, necessitating the use of ahorizontally scalable computing platform [5], [6]. Thisrequires multiple stream processing computations to beinterleaved on the same machine. This brings to the forethe notion of interference either from collocated com-putations within the same process (internal/endogenousinterference) or from collocated external processes on thesame machine (external/exogenous interference). Inter-ference leads to increased resource contentions that canpreclude high throughput processing. In extreme cases,failing to keep pace with data rates results in queuebuildups and processing delays.

3) Initial placement of computations may become ineffective: Thisis due to variability in the resource utilization and thestream processing workload. Variability in the resourceutilization profile may be due to external interferenceand maintenance activities [7]. The stream processingenvironment may itself contribute to this flux via theremoval or addition of stream processing tasks, datainduced load imbalance, etc.

4) Minimizing resource utilization imbalances: Stream process-ing over a collection of machines should not introduceimbalances. Specifically, we wish to avoid situationswhere some machines are overloaded, whereas others areunderutilized. Such imbalances may lead to increased in-terference between computations and will result in lowerthroughput since a heavily used machine experienceshigher context switches, memory pressure, and increasednetwork contention.

Research Questions: Achieving low-latency, high-throughput processing of streams at scale requires us toaccount for stream packet arrivals, resource utilizations,tracking interference between computations, and leverag-ing these to inform scheduling decisions. Specific researchquestions that we explore include:

RQ-1: How can we account for stream packet arrivals and theiraccompanying resource footprints to ensure high-throughputprocessing? How can we achieve this while ensuring lowlatencies per packet? (§ 4.1.1, § 4.1.2)

RQ-2: How can we effectively account for both internal andexternal interference that impact performance – both latency andthroughput – of stream processing computations? Specifically,how can we quantify this interference, and how can theseinterference scores be used to inform online scheduling of streamprocessing computations? (§ 4.1.2, § 4.1.3)

RQ-3: How can we ensure system stability that minimizesoscillations and cascading migrations of computations acrossmachines within the cluster while minimizing utilization im-balances? Since online scheduling involves migration ofcomputations to alleviate performance bottlenecks, caremust be taken so that these migrations do not induceperformance problems. We also wish to avoid resourceutilization skews where some machines are heavily utilizedwhile others are idling. (§ 4.2)

Approach Summary: In this paper we describe our al-gorithms and an accompanying implementation for onlinestream scheduling at scale with low latency per-packetwhile achieving high throughput. Our methodology fo-cuses on reducing interference between stream processingcomputations in the presence of variability in processingworkloads and system conditions. This is achieved througha series of proactive, continuous, and incremental schedul-ing decisions where computations are migrated to ma-chines with less interference. These migrations help reduceresource utilization imbalances within the cluster whilealleviating performance hotspots, both of which improveperformance. We have implemented these algorithms in thecontext of our stream processing engine, Neptune [8].

At the core of our online scheduling algorithm is adata structure called prediction rings that encapsulates aset of footprint vectors. Prediction rings are used to trackthe expected resource utilization of a stream computationin the future. Each element in the vector represents theexpected resource utilization of a stream computation for aparticular time granularity. Resource utilizations in the nearfuture are captured at a fine-grained level while the resourceutilizations further out into the future are captured at acoarse-grained level. Prediction rings are populated basedon expected stream arrival rates forecast using time-seriesanalysis.

To inform online scheduling of computations, we intro-duce the notion of an interference score. The interferencescore is a normalized score that quantifies the expectedinterference for a stream computation when placed on aparticular machine. It is calculated using the prediction ringof a particular computation and the prediction rings of thecollocated computations accounting for both the internaland external interference. A computation will be migratedto another machine if there is a significant reduction ininterference at the new location compared to its currentlocation. Our migration protocol handles both stateful andstateless computations and ensures the correctness of astream processing job. The methodology includes mecha-nisms to counteract oscillations, cascading migrations, andfrequent inefficient migrations.

We profile the efficiency of our algorithm based onextensive benchmarks with health stream computations forthorax and ECG processing. Our evaluation metrics includethroughput, 99th percentile of latency, variance in latency,and resource utilization imbalance within the cluster.

Paper Contributions: Contributions of our methodologyinclude the following:

1) Proactive circumvention of internal and external inter-ference by accounting for variability in resource utiliza-

Page 3: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

3

tion and stream processing workloads.2) Our prediction rings data structure is compact,

memory-resident, and effectively captures data arrivalsand the accompanying resource utilization patterns fora computation. These prediction rings can be aggre-gated to generate compound footprints encompassingthe collection of computations at a machine.

3) Prediction rings and interference score calculations al-low us to effectively alleviate interference for compu-tations by identifying the most suitable machine tomigrate impacted computations to.

4) The online stream scheduling algorithm continuallytracks resource utilizations at machines and ensurestargeted, incremental, and proactive alleviation of per-formance hotspots via migrations.

5) Our algorithm minimizes resource utilization imbal-ances. The processing load is dispersed such that eachmachine is hosting computations that are less likely tointerfere with each other. This prevents performancehotspots and allows us to achieve high-throughputprocessing of streams.

Paper Organization: The remainder of this paper is orga-nized as follows. In section 2, we motivate the necessity ofan online scheduling scheme like ours for stream processing.Methodology of our work is presented in section 3. Ouronline scheduling algorithm is discussed in section 4. Theresults of our empirical evaluation are presented in section 5.Related work is reviewed in section 6. Section 7 outlinesconclusions and future work.

2 BACKGROUND AND PROBLEM STATEMENT

A stream processing job is usually modeled as a directedacyclic graph of stream operators. A stream operator con-tinuously transforms the data items in a data stream [9].A stream operator can either be a stream ingestion oper-ator or a stream computation. There may be one or morestream ingestion operators that ingest data streams into thesystem from external sources. Stream computations imple-ment parts of the stream processing logic. Stream operatorsare connected through streams that form the edges of thestream processing graph. Sink operators are a special typeof stream computation that does not have any outgoingstreams; streams flowing into sink operators are calledterminal streams [10]. Sink operators make the results of astream processing job available to external systems such asa visualization system, a persistent storage system, or evenanother stream processing job.

During the deployment of a stream processing job,multiple instances of a stream operator may be deployedto exploit the parallelism provided by multicore processorarchitectures as well as distributed computing clusters. Thisenables data-parallel processing of streams [9]. Streams be-tween operators need to be partitioned to ensure that eachinstance receives a proportional share of the input streams.The choice of the stream partitioning function depends onthe nature of the processing performed at each operator andhow the use case is mapped into a stream processing graphin general. Each of these operator instances is executed as astream processing task.

These tasks need to be deployed within a set of dis-tributed machines for execution where the number of tasksare two orders of magnitude greater. In addition to sat-isfying the resource matching requirement, the placementplan is further governed by constrains such as collocationand quality of service requirements such as upper boundson response times. This problem is considered an NP-Hard/NP-Complete problem [1], [2], [3], [4]. Generating theinitial placement of stream processing tasks in a distributedsetup is a well-studied problem [10], [11], [12], [13], [14]. Anaive approach would be to distribute the tasks among theavailable processes in a round-robin manner. For instance,the default scheduler of Apache Storm [15] follows thisapproach. A heuristic based scheme would be to estimatethe resource requirements of each of the tasks and use avariation of the bin-packing problem to generate the initialplacement of the tasks [11]. For instance, Apache Storm’sResourceAwareScheduler [16] follows this approach by al-lowing users to augment the stream processing job spec-ification with resource requirements for stream operators.Existing work has also relied on the properties of the streamprocessing graph itself, such as the communication patternsbetween tasks, in order to derive a more efficient placementplan. One example is to collocate tasks with a higher amountof pairwise communication within a single process [12].

Regardless of the scheme used to generate the initialplacement plan, placements are bound to become ineffi-cient over time. This is due to the inherent variability insystem conditions and stream processing workloads. Thesystem conditions can vary due to activities of the otherapplications sharing the resources (e.g., CPU, memory, net-work bandwidth, etc.), contending for global resources (e.g.,network switches), power limits (e.g., CPUs mitigating ther-mal effects by throttling down), energy management (e.g.,power saving modes), and periodic maintenance activities(e.g., periodic log compaction, reindexing of distributed filesystems) [7]. Changes in the stream processing workloadsoccur due to the deployment and termination of streamprocessing jobs and operators switching between active anddormant phases due to data availability or the satisfiabilityof other conditions. Data induced load imbalances can alsocreate fluctuations in the computation workloads [7]. Forinstance, a particular stream partition may be accountingfor a majority of the stream over time even though thepartitions were initially assumed to be balanced (e.g., a mon-itoring device attached to a patient in a critical condition isconfigured to take measurements more frequently or flash-crowds responding to certain events via social networks).Additional processing may be triggered at an operatorbased on characteristics of the data items within a stream(e.g., triggering an alarm upon detecting an anomaly). Theseare categorized as medium and long-term load fluctuationsas opposed to short-term load fluctuations that happendue to the event-based aperiodic nature of stream sourcesand the transient inconsistencies of the performance of thenetworking infrastructure [14].

The changes in the workload and system conditionsinfluence a stream computation’s level of contention forshared resources with other collocated computations andexternal processes. Contention for shared resources causesan interference in the execution of a stream computation.

Page 4: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

4

50 100 150 200

Number of Computations

0

20

40

60

80

100B

andw

idth

Uti

lizati

on (

%)

0

50

100

150

200

250

300

350

400

Late

ncy

(ms)

Bandwidth Utilization (%)

Latency

Fig. 1. Bandwidth consumption and processing latency observed at asingle machine against the number of collocated computations.

When caused by collocated computations within the streamprocessing engine, it is called internal interference. Whencaused by collocated external processes on the same ma-chine, it is called external interference. A stream process-ing computation will experience both internal and exter-nal interference in varying degrees throughout its lifetime.Though the motivation behind most heuristic initial place-ment schemes is to ensure minimal interference across thecluster, it is hard to maintain this property over time due tochanges in workload and system conditions as discussedabove. Increased contention for shared resources beyondtheir available capacity can degrade the performance ofa stream processing computation, both throughput andlatency, because these computations have to wait longerfor their share of the shared resource and/or receive areduced share of resources. Variability in the workload andsystem conditions can affect individual machines in varyingdegrees causing resource utilization imbalances that resultin different levels of interferences experienced by computa-tions. Computations placed on underutilized machines mayexperience lower interference levels whereas computationson overutilized machines may experience higher interfer-ence levels.

To understand the impact of interference on the per-formance of a stream processing system, we measuredthe cumulative performance of a set of stream processingcomputations when subjected to varying degrees of inter-nal interference. Thorax extension processing computations,explained in § 5.2.1, were used for this experiment. Themachines running the stream ingestion operators were ad-equately provisioned to ensure that they did not become abottleneck during this scalability test. Stream ingestion andacknowledgment operators were dispersed over a groupof 10 machines, whereas the thorax extension processingcomputations were all collocated on a single machine. Thenumber of concurrent stream processing jobs was increased,which in turn increased the number of collocated thoraxextension processing computations; that produced increasedinternal interference. Figure 1 depicts the bandwidth uti-lization and the processing latency observed at the machinewhich hosted the thorax extension processing computations.Incoming traffic that encapsulates thorax readings domi-nates the bandwidth consumption, which is also a measureof cumulative throughput. Even though the cumulativethroughput is expected to increase with the number ofstream processing computations, it does not continue to

do so beyond a certain point. This is because the nodehas reached the maximum possible bandwidth utilization atthis point; any additional computation placed on the nodewill interfere with collocated computations with respectto network bandwidth thereby degrading throughput atindividual computations. Average end-to-end latency of thecluster also increased mainly due to increased data transfertimes between data ingestion operators and data processingcomputations. Even though the network bandwidth was thefirst resource to exhaust its capacity for this particular usecase, it is possible that any other resource or a combinationof resources can become the limiting factor for other usecases. Horizontal scaling [9], [17] and load shedding [9] aretwo well-studied solutions that are often used in such sce-narios. But we argue we must attempt to effectively utilizethe available resources before provisioning more resources(horizontal scaling) or enabling graceful degradation (loadshedding).

This necessitates an online scheduling scheme that isable to continuously adjust the placement of tasks basedon the changing workload and system conditions. Suchan online scheduling scheme can alleviate hotspots in thecluster and reduce the imbalances in resource utilization.Accomplishing hotspot alleviation and imbalance reductionwill improve the performance (throughput and latency) ofthe stream processing system and reduce the performancevariability, especially for latency related metrics. We haveimplemented such an online scheduling scheme to reducethe interference experienced by computations. Our onlinescheduling algorithm continuously and incrementally mi-grates computations to reduce the interference experiencedby computations. It ends up moving computations awayfrom overutilized nodes towards the under-utilized nodesthat have spare capacity to host additional computationswhile also alleviating hotspots within the cluster. Our al-gorithm complements any existing initial placement algo-rithms and focuses on the dynamic online scheduling ofstream processing jobs under varying loads and systemconditions. Even though, our dynamic online schedulingscheme preserves the necessary QoS guarantees throughimproved resource utilization, the system may need to scaleout if the workload demands more system capacity. Alsofor certain applications, load shedding is a viable alternativewhere the input streams are sampled. Even though this im-pacts the accuracy of the results (hence considered unsafe),it is acceptable for certain types of applications [9]. If theonline scheduling algorithm repeatedly fails to reduce theinterference experienced by a computation, either of thoseschemes can be triggered.

3 SYSTEM OVERVIEW

We have validated our methodology (§ 4) for online schedul-ing in the context of our stream processing system, Nep-tune [8]. Here, we discuss the key components of the system,their responsibilities, and interactions with each other.

A Neptune cluster comprises a set of worker nodes run-ning on a cluster of interconnected physical or virtual ma-chines. Each worker node is an independent JVM processthat concurrently executes a set of stream processing com-putations. Each worker has a node manager that supervises

Page 5: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

5

Data TrafficInter-Process Control Traffic

Intra-Process Control Traffic

Node

Prediction Ring for Stream

Computation

Stream Computation

Node Manager

Prediction Ring for Node

Node

ZookeeperEnsemble

PrimaryOrchestration

Manager

SlaveOrchestration

Manager

...

Fig. 2. System architecture depicting key components and interactions.

execution of computations assigned to the node. Initialplacement and online scheduling of tasks in the clusteris managed by a separate process called the orchestrationmanager. These entities are depicted in Figure 2. Nodemanagers send periodic status updates to the orchestrationmanager incorporating both node and task status using adata structure called a Prediction Ring. At the orchestrationmanager, these periodic updates are used to infer the globalstate that informs scheduling decisions to migrate tasks andmitigate resource imbalances. The orchestration managercoordinates a migration (as explained in § 4.2) by communi-cating with node managers at the new node, current node,and upstream nodes to redirect data streams through theexchange of control messages. The control plane is logicallyindependent from the data plane that carries data streamsthat are input to stream processing computations. This helpsto reduce the end-to-end latency involved in processingcontrol messages without being affected by queuing delaysand backpressure if the same channel is used for both typesof traffic.

To ensure failure resiliency, a secondary instance of theorchestration manager is run in parallel in the active repli-cation mode. The secondary replica is actively synchronizedwith computation placement information after deployingnew jobs or at the end of a migration in addition to ajob’s physical deployment plan: tasks and the data flowbetween tasks. The remainder of the state can be recon-structed through periodic updates from node managerswithin a time interval less than or equal to the periodicityof state update messages after the secondary orchestrationmanager has taken over the role of the primary. Neptuneuses Zookeeper [18] for metadata management. We lever-aged the same Zookeeper ensemble for leader election oforchestration manager nodes to appoint and discover theprimary orchestration manager.

Using its global knowledge of the entire system, the cen-tralized orchestration manager can make efficient schedul-ing decisions. Alternatively, it is possible to use a moredecentralized approach such as, peer-to-peer or cluster-to-cluster, where nodes will arrange themselves as a virtualnetwork [19]. By making scheduling decisions based onlocal knowledge encompassing a subset of the nodes, this

provides a more scalable and failure resilient model atthe expense of efficient resource utilization. Such an ap-proach facilitates work-stealing [20] as opposed to the work-pushing approach employed by our methodology, whichis more stable and introduces lower communication over-head.

4 ONLINE SCHEDULING ALGORITHM

Our algorithm closely resembles the MAPE loop used inautonomous systems, which is widely adapted for imple-menting elastic and dynamic systems [21]. It comprisesfour phases: monitoring (M), analysis (A), planning (P) andexecution (E). We use prediction rings during analysis andplanning phases.

4.1 Prediction Rings

Prediction rings track data stream arrivals for a given streamprocessing computation. The data structure is then used totrack and predict a computation’s expected resource utiliza-tion. We use prediction rings to compute an interference scorethat quantifies the impact of placing an additional streamcomputation alongside other collocated computations on amachine.

4.1.1 Data StructureThe prediction rings data structure consists of multiplefootprint vector “rings”, each implemented as a circularbuffer. Each element of a ring represents the expected re-source utilization during a given discrete time window. Thevalue stored in each time window can vary by the goalsof the application, provided it is some metric indicatingthe amount of resources required by the computation. Forinstance, the rings may be biased towards memory utiliza-tion, penalizing higher memory consumption. In this studythe prediction rings are biased towards tracking CPU andnetwork bandwidth consumption.

The number of rings and the resolution of the timewindows within each ring are configurable. Having a mul-tiring data structure allows us to capture arrival patterns(and expected resource utilization) at both fine and coarse-grained levels. Each ring radiating outwards representsprogressively increasing time frames; the ring is bigger,with larger sectors each of which is at a coarser grainedresolution. The innermost rings capture expected packetarrival rates at fine-grained resolutions. During an updateto the prediction ring, the current window pointer is setto the current time, and each following clockwise windowindicates the expected utilization at an increasingly distantfuture time. Furthermore, moving from the inner rings to theouter rings represents moving further out into the future.As such, each ring has a static offset equal to the total timecapacity of the preceding inner rings. The window offset forany additional outer rings begins immediately after the lastoffset of the preceding inner ring. As wall time progresses,the current window pointer advances to a future clockwisewindow. Previous windows become invalid and are filledwith a new prediction for the far future.

A conceptual view of a prediction ring with 3 rings isdepicted in Figure 3. The innermost ring contains 16 1250ms

Page 6: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

6

Window Resolution: 1250msWindow Count: 16Time Frame: [(t’ + 0s), (t’ + 20s))

Window Resolution: 1750msWindow Count: 12Time Frame: [(t’+20s), (t’ + 41s))

Window Resolution: 3250msWindow Count: 7Time Frame: [(t’ + 41s), (t’ + 63.75s))

W1, 2

W1, 1

W1, 3

W2, 1

W2, 2

W2, 3

W3, 1

W3, 3

W3, 2

W1, 16

...

...

W2, 12

W3, 7

t = t#

t = t# + &

...

Time Granularity

Fine-grained Coarse-grained

Fig. 3. A conceptual view of a prediction ring. The ring tracks how manypackets are expected in a given time window. The arrow points at thecurrent window. Clockwise windows represent the future. The numberof rings as well as the resolution is configurable.

windows accounting for the next 20s from the current timet′. Similarly the middle ring accounts for the time period of[t′ + 20s, t′ + 41s) using 12 windows of 1750ms resolution.In our reference implementation we have used a predictionring with 3 rings. Each ring from innermost ring to theoutermost ring contained 30 windows with window lengthsof 1s, 2s and 3s respectively. Together, these three ringsaccount for the next 3 minutes from the current time.

Prediction rings are designed to be compared againsteach other. Furthermore, the data structure is amenable toaggregation; i.e., we can take a collection of prediction ringsand aggregate them into a single combined footprint vector.This is done by summing the window contents of eachindividual ring. The combined vector can then be part of apairwise interference calculation with another computation,eliminating the need to individually check for interferencewith each existing computation. During aggregation op-erations, prediction rings are aligned with each other byshifting their current window pointers which indicates thetime each prediction ring was last updated to the currenttimestamp.

4.1.2 Populating Prediction RingsA prediction ring is updated periodically to reflect operatingconditions. These updates invalidate past windows. It isalso possible that current values of future windows havebecome obsolete due to changes in the workload and systemconditions. Updates to a prediction ring are performed intwo steps:

1) Initial value assignment with predicted message ar-rivals

2) Projecting the normalized resource consumption usingthe predicted input rates

For the first step, we use exponential smoothing, a time-series prediction model, to predict the message arrival ratefor a computation. Exponential smoothing relies on theentire set of past observations but assigns exponentiallydecaying weights for older values. This is different fromother moving average models where an equal weight isassigned to every past observation [21]. We use the triple

Time0

5000

10000

15000

20000

25000

30000

35000

40000

Input

Rate

(M

sgs/

s)

Actual Input Rate Predicted Input Rate

Fig. 4. Predicted message arrival rate using Holt-Winters model vs.actual message arrival rate for a single computation.

exponential smoothing method that has a seasonal com-ponent (β) in addition to a smoothing constant (α) and atrend component (γ) [22]. More specifically, we use the Holt-Winters method of exponential smoothing for predictingthe message arrival rates for a computation based on priorarrival rates. The average message arrival rate calculatedover a sliding window is used as the input for building thetime series model because it eliminates short-term variationsin arrival rates arising due to shared, overloaded networkresources and other optimizations such as application-levelbuffering.

There are two challenges when using the time seriesprediction model mentioned above. The triple exponentialsmoothing model requires data gathered from at least twoseasons in order to produce predictions with higher accu-racy. This creates a cold start problem due to unavailabilityof observations at the beginning of the execution of a streamcomputation. Possible solutions to this problem would beto feed the model with observations gathered offline orto collect data during the execution of a computation andstart predictions only when a sufficient number of observa-tions are collected. Using data gathered offline can be errorprone because the input rates observed by a computation(which may be different from the rate at which its upstreamcomputation emits data) is heavily dependent on its oper-ating environment. So we opted for the latter approach toaddress the cold start problem. Until enough observationsare recorded for the time series prediction model, we usea simple moving average model over a sliding windowof past observations to approximate the next value. Thesecond challenge when using the Holt-Winters model is thenecessity to adjust the smoothing parameters (α, γ and β)over time if there is a significant change in the environmentthat invalidates the current model [23]. Due to changesin the system conditions (e.g., changes in the number ofcollocated computations), the time-series model may be-come inefficient. We address this issue by recalculating thesmoothing parameters using only the latest observationsif the prediction error exceeds a certain threshold for aconsecutive number of prediction cycles. Figure 4 shows thepredicted value vs. actual value of message arrival rates fora single computation. The message rate of the simulatedinput stream follows a recurring pattern that is effectivelycaptured by our time-series model.

Page 7: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

7

Our methodology does not preclude the use of othertime-series prediction schemes such as ARIMA [24], arti-ficial neural networks or genetic algorithms [23]. Artificialneural networks and genetic algorithms provide very ac-curate predictions in highly dynamic systems but requireprolonged training times. We chose exponential smoothingbecause it satisfies several of our requirements: fast training,accuracy, compactness and quick evaluations.

The predicted input rates are then transformed to reflectthe predicted resource utilization of the computation. A pre-diction ring will undergo a series of manipulations duringthis process to ensure that the calculated resource utilizationvalues are normalized. Normalizing computations and hostmachines is necessary for fair comparisons to find a betterlocation for a computation. We now discuss the rationalebehind the multipliers used for normalizing prediction ringsof computations.• Processing time per message: This is a measure of a compu-

tations CPU requirements. This also accounts for the het-erogeneity of computations and data induced additionalload.

• Message size: This is a measure of the computation’s band-width requirements when used together with its inputrate.

Once the prediction rings of individual computations aretransformed, they are summed to generate the predictionring for the resource. Then a series of multiplications areperformed on the resulting aggregated ring in order toreflect the resource utilization of the node.• Normalized load average of the host machine: This is calcu-

lated by dividing the load average of the last minute bythe number of CPU cores as a measure of how saturatedthe host is.

• Excess bandwidth utilization: This reflects the bandwidthconsumed in excess of the preferred upper limit.

• 1 - fraction of load average caused by the node: This is ameasure of the CPU-wise external interference on theprocess.

• 1 - fraction of bandwidth utilization incurred by the node:This is a measure of the network bandwidth-wise externalinterference on the process.

Normalizing the prediction ring of the machine is moreinvolved than normalizing prediction rings of individualcomputations. The original prediction ring of the machine,calculated by summing up the prediction rings of the in-dividual components, is preserved for aggregations withprediction rings of computations in order to calculate in-terference scores (as explained in the next section). Theresulting prediction ring from the aggregation operation isthen normalized using multipliers discussed above. Thesemultipliers, captured using various monitoring tools, arevalid only for a short duration of time because they aredependent on the stream rates and the load profile ofexternal processes. So instead of multiplying the values inwindows of every ring, the multiplication operations areapplied only to the windows of the innermost ring.

4.1.3 Using Prediction Rings to Quantify InterferenceWe use the notion of interference scores to inform migrationdecisions. The interference score is a floating point value

TABLE 1Notation used in interference score calculation algorithm.

score Interference scorering Current ring number

ringCount Number of rings in the prediction ringdist Distance to window pointed by window pointer

ringOffset Offset to first window in a ringp Window pointer

ringSz Number of windows in a ringru Resource usage score of a window in the nodes’s

prediction ring without computationrw The value in the window in a prediction ring of the

noders Weighted resource usage Score of a window in the

nodes’s prediction ring without computationn Interference score difference amplifier

cpuFrac Fraction of the available processor coresbwFrac Fraction of the available bandwidthringRes Resolution of the window

cu Resource usage score of a window in the node’sprediction ring with computation

cw The value in a window in the computation’s predic-tion ring

cs Weighted resource usage score of a window in thenode’s prediction ring with computation

totalDist Total length of time represented by the entire pre-diction ring

that indicates the degree of interference between compu-tations. The larger the score, the greater the degree ofinterference. There are two main properties that we wantedin our interference score.

• Property-1: Identifying how soon an interference is likely tooccur - We accomplish this by assigning less weight tomore distant interferences. This counteracts predictionerrors too far out into the future.

• Property-2: Ability to reflect the load on a given node - Thisis to ensure that computations contribute only slightly tothe score if the node is lightly loaded for a given windowand contribute much more significantly if the load exceedsavailable resource capacity.

Fig. 5 depicts pseudocode for our interference score algo-

score← 0for ring = 0 to ringCount do

dist←ringOffsetfor p = 0 to ringSz do

ru← .5× rw[p− 1] + rw[p] + .5× rw[p+ 1]rs← run/(cpuFrac ∗ bwFrac)n/ringRescu← ru+ .5× cw[p− 1] + cw[p] + .5× cw[p+ 1]cs← cun/(cpuFrac ∗ bwFrac)n/ringResscore← score+ (cs− rs)× (1− dist/totalDist)dist← dist+ ringRes

end forend forreturn score

Fig. 5. Pseudocode for computing interference scores.

Page 8: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

8

rithm. The notation used in the algorithm is defined inTable 1. The usage score for each window is computedwith and without the existence of the computation beingconsidered for migration. In order to calculate the predictionring of node without a computation, we subtract the com-putation’s prediction ring from the node’s prediction ring.We use the non-normalized version of node’s predictionring in this subtraction. The difference is then normalized asexplained in § 4.1.2. The difference between these weightedusages scores is added to the final interference score sothat the score effectively reports only the impact of placingthe new computation on the resource and is not undulyweighted by previously placed computations. This alsomeans that the interference score operation is asymmetric.Checking how much a single computation interferes withall computations on a resource will produce a different scorethan checking how much the many computations interferewith the single computation, which is logical behavior foran interference scoring algorithm. The parameters cpuFracand bwFrac in the interference score calculation take intoaccount the slack we set aside to accommodate bursty traffic.By setting these upper resource consumption thresholds, weare trying to achieve both an efficient and a safe resourceconsumption across the cluster similar to most dynamicsystems that support workload migrations and horizontalscaling.

The dist indicates how many milliseconds into the futurethe window at position p is, growing as more windows areprocessed. Dividing by the total millisecond capacity of allrings, totalDist, allows the result to be scaled down basedon the distance into the future. For each window in theprediction rings, the utilization reported by that windowis scaled and contributes to the final score, thus satisfyingProperty-1 that is expected in the calculated interferencescore.

Property-2 is achieved by exponentiating the score com-ponents with an integer greater than 1 – this exponent iscalled the interference score difference amplifier (n). Thisallows the score contribution to grow quickly as morecomputations are assigned to a time window. This, in turn,differentiates between placements resulting in collisions in-volving two computations versus collisions involving threecomputations, with fewer collisions being more desirable.Another reason arises from the tendency for computationswith a high arrival rates to unnecessarily produce high in-terference scores, even if few collisions occur.Exponentiatingallows the many windows without collisions to contributeonly slightly, while allowing the occasional collision to con-tribute appropriately based on the severity of the collision.

4.2 Migrating Computations Using Interference ScoresPrediction rings and interference scores are eventually usedfor online scheduling where computations are migratedto nodes where they are subject to less interference andimproved performance. The steps involved in the migrationof a computation are depicted in Figure 6 in chronologicalorder. There are three periodic tasks every node managerexecutes:1) Update prediction rings of individual computations2) Calculate the prediction ring for the node and send it to

the orchestration manager

3) Calculate interference scores for individual computations

During the third task, the computation that records thehighest interference against the rest of the computations —exceeding a predefined threshold in consecutive evaluations— is chosen for the next migration attempt. If there issuch a computation, the node manager sends a migrationrequest to the orchestration manager. Migration requestsare piggybacked with the periodic prediction ring updatemessages sent by the node manager. A migration requestcontains the prediction ring of the computation chosen formigration and the interference score it recorded against therest of the computations[Ic].

Upon receiving a migration request, the orchestrationmanager identifies the best possible node for the compu-tation. The orchestration manager calculates interferencescores individually for each node (except for its current hostnode) using their prediction rings and the prediction ringof the computation. If the minimum resulting interferencescore (In) is significantly lower than the interference scorereported at its current location (Ic), then a migration isinitiated. Otherwise a rejection response to the migrationattempt is sent back to the current node; this can also beused to inform provisioning of new nodes or horizontalscaling in a cloud setting. To decide whether to initiate themigration, the percentage reduction in interference for theimpacted computation (∆I) is calculated as follows.

∆I =(Ic − In)

Ic× 100%

If the percentage reduction is greater than a configurablethreshold, a migration is initiated. For example, the defaultthreshold in our implementation was set to 5%.

The first step of the migration is to deploy an empty in-stance of the computation, i.e., without any state, in the newlocation. Once the deployment of the empty instance is com-plete, the upstream computation needs to be paused untilthe current state of the computation is successfully migratedto the newly deployed instance. Instead of completely paus-ing the entire upstream computation, it temporarily stopsemitting messages to the stream connected to the computa-tion being migrated while continuing to emit messages toother streams. The messages destined to the paused streamare buffered in memory. If the memory consumed by thebuffered messages exceeds a certain threshold, the upstreamcomputation completely pauses to ensure that the perfor-mance of the other collocated computations are not affecteddue to increased garbage collection activities. Pausing thestream from the upstream computation ensures safety, inother words preserving the correctness of the stream pro-cessing job during the migration [9]. Pausing is required be-cause we have used point-to-point communications in Nep-tune to optimize for high throughput settings [8]. Switch-ing to Neptune’s publisher-subscriber communication modecan completely eliminate the need for pausing of the streamalbeit at reduced throughput. The pub-sub mode will makethe stream packet flow asynchronous and improve the mo-bility of stream computations as advocated in [25], [26].After pausing, in its acknowledgment to the orchestrationmanager, the upstream computation includes the sequencenumber of the last message emitted to the paused stream.

Page 9: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

9

Current NodeOrchestration

ManagerNew Node Upstream Node

Migration Request

Deploy New Instance

Deployment Ack

Pause Stream

Ack Pause Stream: Last Emitted Message

Identifier

Terminate Processing: Last Emitted Message

Identifier

State Transfer

State Transfer Ack

Terminate Processing Ack

Resume Stream

Time

Fig. 6. Sequence diagram depicting a computation migration.

This information is relayed back to the stream computa-tion by the orchestration manager. The stream computationwill wait until it has completed processing this particularmessage before moving to the next phase ensuring that nomessages are left unprocessed during the migration. Next,the computation being migrated will serialize its state andsend it over to the fresh instance of the computation placedat the new location. Upon processing this message, the newinstance of the computation restores its state. Once the newinstance is ready to process messages, the upstream compu-tation will first play the buffered messages and resume itsregular operations. The computation at the old location isthen terminated permanently.

4.2.1 Ensuring System StabilityMigrations incur a significant overhead mainly because theyinterrupt the regular operation of the upstream compu-tations and pause the processing of a subgraph for theduration of the migration. The throughput of the streamprocessing job drops for a while, and the latency will showa sudden spike when processing the buffered messages.Hence triggering a migration should be done only if the ex-pected performance-gains outweigh this temporary degra-dation in performance. There are some built-in measures inour implementation to reduce unnecessary migrations.

Using a dynamic threshold function at the orchestrationmanager is one such measure. As discussed before, a migra-tion is triggered only if the expected percentage reductionof interference (∆I) is greater than a certain threshold. Thisthreshold value is dynamically adjusted based on the stateof the system: if there is a significant variation in resourceutilization within the cluster, it is set to a lower valueand vice versa. This dynamic threshold function encouragesmigrations when there is a significant resource imbalancein the cluster, even if there is a small improvement ininterference. In our implementation, we have used a simplestep function that sets different threshold values based onthe variance in the interference scores reported for a com-putation (targeted for migration) against every node.

Another stability measure that we leverage is to force acooling down period [21] on nodes after they have participatedin a migration, either as the source or destination. Duringthis period, such nodes are not allowed to either trigger anymigrations nor are they considered a candidate to receive

computations from other nodes. The cooling down periodalso provides time for the monitoring system to capturechanges that occurred during the previous migration allow-ing time series models to stabilize and recalibrate if neces-sary. This also reduces the number of migrations triggereddue to unreliable prediction rings.

Since the computation with the highest interference scoreat a node is picked for the next migration attempt, theorchestration manager may not be able to find a better node.The migration request will be rejected and possibly will con-tinue to get rejected in successive attempts. Such successiverejections will prevent the node from making any progresstowards alleviating the hotspot. As a countermeasure, if amigration request for a particular computation is rejectedthen it will not be scheduled for migration for some time.By moving computations with less interference to betteralternative locations, it may reduce the interference at thecurrent node. If it does not reduce the interference of theoriginal computation as expected, then it is an indicationthat the system either needs to be scaled out horizontally orstart load shedding.

5 EMPIRICAL EVALUATION

5.1 Experimental Setup

The benchmarks reported here were performed in a clustercomprising 54 physical machines connected over a 1 GbpsLAN. Each machine is a HP-DL60 server (Xeon E5-2620CPU and 16 GB RAM) running Fedora 23 and OracleJava 1.8.0 65. Primary and secondary instances of the or-chestration manager were running on dedicated machines.A three node Zookeeper ensemble with each Zookeeperserver running on a dedicated machine was used. Stream

0 10 20 30 40 50 60

Time Since Activating Dormant Computations (Minutes)

10

20

30

40

50

60

Thro

ughput

(Mess

ages/

s in

Mill

ions)

With Prediction Rings Based Online Scheduling

Without Prediction Rings Based Online Scheduling

Best Achievable Performance

Fig. 7. Cumulative throughput of the cluster over time with variableinput rates under internal interference. When migrations are enabled,the throughput gradually increases as hotspots are alleviated. Its peakperformance is approximately equal to the near-ideal performance.

ingestion operators were scheduled to run in an adequatelyprovisioned setting with 33 dedicated machines ensuringthat ingestion operators do not become a bottleneck dur-ing the experiments. Stream computations were scheduledto run on group of 15 dedicated physical machines thatdid not overlap with the machines allocated for streamingestion operators. Each physical machine was running asingle Neptune process. A central statistics server was usedto gather various cluster-wide benchmark metrics: stream

Page 10: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

10

processing performance, resource utilization at individualnodes, and migration activity data. Having a centralizedstatistics server helped us accurately analyze how differentmetrics varied over time across the cluster.

5.2 Stream Processing Use CasesOur empirical evaluations are performed with two use casesfrom the health stream processing domain: thorax extensionand ECG processing using the Pan-Tompkins algorithm. Inorder to simulate high throughput streams, the records fromthese datasets were ingested at much faster rates than theactual message rates. This did not affect the correctnessof the stream computations, because the timestamp valueencoded in the record itself was used for processing insteadof the ingestion timestamp or the wall clock time.

Thorax extension processing was more CPU intensivethan ECG processing. Due to its high throughput, ECGprocessing computations were creating more strain on thenetwork bandwidth. Due to their smaller in-memory state,none of them caused significant memory pressure. Streamprocessing graphs of both these use cases have the samestructure. A single stream processing job contains a streamingestion operator, a stream computation operator whichimplements the thorax extension or ECG processing logicand a sink operator that is used primarily for measuringend-to-end latencies. The stream ingestion operator and thesink operator were collocated on the same Neptune nodeallowing us to measure the end-to-end latency withoutbeing affected by clock synchronization and skew issues.The workloads for these benchmarks comprised a mixtureof these stream processing jobs with a ratio of 1:1. Thoughreal-world health stream processing use cases will only beexecuted on dedicated clusters due to their critical nature,we have used them to evaluate the efficacy of our onlinescheduling scheme in both shared and dedicated clustersetups.

5.2.1 Thorax Extension ProcessingThe thorax monitoring computation we use here is designedto act as a backend for a visual monitoring application. Itretains the last 10 seconds of chest expansion and contrac-tion data in memory of 6 patients, while also maintaininga running average, minimum, and maximum values seen.The thorax extension dataset we use was gathered by Dr. J.Rittweger at the Institute for Physiology, Free University ofBerlin [27].

5.2.2 ECG Processing with Pan-Tompkins AlgorithmOur ECG computations process ECG waveform data from10 ICU Patients that is available as part of the MIMICdataset from physionet.org [28], [29]. An ECG monitors theheart’s electrical activity, which drives the expansion andcontraction of heart muscles based on the generated polarity.

We preprocessed ECG waveforms using the well-knownPan-Tompkins algorithm to detect the QRS complex [30]:this includes bandpass filtering, differentiation, and integra-tion. Since ECG waveforms need to have all its frequencycomponents within the 5-15 Hz range, the waveform isbandpassed to filter out undesired frequency componentsand then differentiated to attenuate the higher variations

0 10 20 30 40 50 60 7015202530354045505560

Thro

ughput

(M

sgs/

s in

Mill

ions)

0 10 20 30 40 50 60 700

10

20

30

40

50

Num

ber

of

Mig

rati

ons

Per

2 M

inute

Peri

ods

Time Since Activating Dormant Computations (Minutes)

n = 2 n = 3 n = 4 n = 5 n = 6

Fig. 8. Determining the interference score amplifier (n)

and squared to remove negative components. We then usedintegration to identify the peaks of the squared wave. In-tegrated signal peak points are used to find the QRS loca-tions, and hence the distance between two QRS complexes,and the amplitude of the QRS is the same as that of thebandpassed wave.

5.3 Determining Parameters for Interference Score Cal-culation Algorithm

For our benchmarks, the values for bwFrac and cpuFracwere assigned based on widely used upper thresholds re-ported in research literature allowing a ∼ 30% slack to han-dle possible load spikes. For network bandwidth utilization,70% was used as the upper threshold [31]. Similarly, 66.6%was used for cpuFrac considering the most commonly usedrange of 50% - 80% for CPU consumption [32].

The interference score difference amplifier (n) was de-termined empirically. We simulated an uneven workloadacross the cluster by activating a set of dormant computa-tions on a select subset of nodes. We deployed 2250 streamprocessing computations across 15 machines (150 compu-tations per node). Only 20% of those computations (450)were active at the beginning. After a while, we activatedthe remaining 80% dormant computations on 5 of thosemachines (600) which created a total of 1050 active computa-tions across the cluster. After dormant computations becomeactive, (as intended) there was a significant imbalance in thecluster where 33% of nodes became performance hotspots.We evaluated the choice for different values of n based ontwo metrics: time to reach the best achievable performanceand the stability of the system. As shown in Figure 8,when n was set to 3, we were able to achieve the bestachievable throughput within the shortest amount of timewhile keeping the number of migrations low over time —achieving a relatively higher stability compared to otherpractical values. Furthermore, exponents less than 3 didnot provide enough amplification to trigger any migrationswhereas exponents greater than 3 were triggering largenumber of (less) effective migrations causing a longer timeto reach the best possible performance and a lower systemstability over time.

Page 11: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

11

0 10 20 30 40 50 60 7010

20

30

40

50

60

70

Thro

ughput

(Msg

s/s

in M

illio

ns)

0 10 20 30 40 50 60 70

260

280

300

320

340

360

380

400

420

Late

ncy

- 9

9th

Perc

. (m

s)

With Prediction Rings Based Online Scheduling Without Prediction Rings Based Online Scheduling Best Achievable Performance

0 10 20 30 40 50 60 7060

65

70

75

80

85

90

95

Late

ncy

- S

td. D

evia

tion (

ms)

Time Since Activating Dormant Computations (Minutes)

Fig. 9. Variation of cumulative throughput, 99th percentile of latency and standard deviation of latency of the cluster over time with fixed rate streamsunder internal interference. The performance of the online scheduling algorithm is compared with the near-ideal performance (computations areevenly distributed across the cluster) and the initial performance (the setup without the scheduling algorithm).

5.4 Internal Interference: Alleviating Resource Imbal-ances

The objective of this set of experiments is to profile howeffectively our prediction ring based online scheduling al-gorithm alleviates resource imbalances caused by internalinterference. Such situations can arise when the workloadsare unevenly distributed among nodes.

We used the same setup discussed in § 5.3 to simulate anuneven workload. To contrast with the achievable near-idealperformance, we evenly distributed the stream processingworkload within the cluster. Specifically, it placed 1050 com-putations evenly across the cluster with each node executing70 active stream computations from the very beginning.This represented the best possible placement where theworkload is evenly distributed and there are no imbalancesin resource utilization. Over time, our online schedulingalgorithm should be able to achieve a placement closer tothis near-ideal distribution through migrations even whenthe placements are initially highly imbalanced.

Our evaluation metrics include: cumulative throughputof the cluster, 99th percentile and standard deviation of theend-to-end latency, and resource utilization of the cluster.

The experiment was conducted for both fixed rate inputstreams as well as variable rate input streams. In order togenerate a stream with a variable message rate, the streamingestion operator employs a load profile that defines themessage emission rate over an outgoing stream at anygiven time. The load profile is a function that takes thetime elapsed since it was activated as the input variable tocalculate the stream emission rate. Fixed rate input streamswere used primarily for assessing latency related metrics.This is because the end-to-end latency of a stream packetis also governed by the message rate on that stream. If themessage rate is high, the latencies tend to be higher due toprolonged queuing delays awaiting access to resources suchas CPU time and network buffers.

Figure 7 depicts the cumulative throughput of the clusterobserved over time with variable rate input streams. Thisexperiment relies on the time-series models to predict thestream rates and migrate computations accordingly. Dueto the bandwidth-bound nature of the stream processinguse cases, computations at crowded nodes were under-

performing mainly due to heavy bandwidth interferencefrom collocated computations. As our online schedulingmoves computations over to nodes with less interference,individually they start to perform better, resulting in in-creased cumulative throughput. The cumulative through-put improved by 48.89% compared to the original peakthroughput values, and near-ideal performance is achievedafter alleviating the hotspots. Next, we repeated the samebenchmark with fixed rate streams in order to understandthe impact of our scheduling scheme on end-to-end latency.Figure 9 depicts the performance of the system over timewith and without our prediction ring based online schedul-ing alongside a comparison with the near-ideal performanceachievable. Using the 99th percentile, we have evaluatedhow the long-tail latency improved as the computations aremoved away from overutilized nodes. The predictability ofmeasured latencies is evaluated using standard deviation,which is a measure of the variability in the recorded latencyvalues. Latency related metrics for even scheduling (near-ideal performance) and for the setup without the onlinescheduling algorithm demonstrate a relatively steady seriesof observations. The latencies exhibit a high initial variationwhen the online scheduling algorithm is running, especiallywhen there is a large number of migrations taking place dueto message buffering at upstream computations. However,once the system reaches a steady state (with a smaller

TABLE 2Summary of performance improvements provided by our online

scheduling under internal interference. For throughput, positive isbetter whereas for latency metrics, negative is desirable.

Use case: Stream Computations with Variable Rate StreamsMetric Deviation from Ini-

tial Perf.Deviation fromNear-Ideal Perf.

Throughput +48.891% -0.001%Use case: Stream Computations with Fixed Rate StreamsMetric Deviation from Ini-

tial Perf.Deviation fromNear-Ideal Perf.

Throughput +83.932% -0.413%Latency - 99th perc. -5.241% +4.768%Latency - Std. Dev. -15.845% +12.672%

Page 12: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

12

number of migrations), we observe improved latency withrespect to both the 99th percentile and standard deviation.We observed a 5.24% improvement in the 99th percentileand 15.85% improvement in the standard deviation of theend-to-end latency mainly due to reduced communicationdelays experienced by packets when processing is movedto nodes with less saturated links. Similar to the previousbenchmark with variable rate streams, there is a significantimprovement in throughput of 83.93% compared to thesetting without online scheduling. Table 2 summarizesthe performance improvements for different metrics withrespect to near-ideal and initial performance for the afore-mentioned benchmarks.

Using the data gathered by monitoring individual nodesduring the benchmark with variable rate streams, we eval-uated how the resource utilization of individual nodeschanged over time. The objective of this evaluation is toobserve how well our scheduling algorithm can alleviate re-source imbalances present in the cluster. Resource utilizationwas measured based on the normalized CPU load averageof the process (provided by OperatingSystemMXBeanclass of Java 8) and bandwidth utilization as a percentage ofthe available bandwidth. Memory utilization was not con-sidered as part of the resource utilization because our streamprocessing use cases did not introduce a significant memorypressure; however, our methodology facilitates incorpora-tion of memory utilization when appropriate. Figure 10shows how the resource utilization at individual nodes waschanging over time after the dormant computations wereactivated. Table 3 lists the mean and standard deviation forbandwidth utilization percentage and CPU load averageat different points in time. Initially, the nodes where thedormant computations were activated show significantlyhigher resource utilization compared to the rest of the nodes,which results in a resource imbalance within the cluster.As the online scheduling algorithm moved computationsaway from these hotspots, gradually the resource utilizationacross the cluster became more consistent and even. This canbe clearly observed by how the standard deviation reducedover time. Also the average resource utilization increasedas our online scheduling algorithm attempted to spread theworkload evenly. This also improved the system through-put significantly as evident from our previous benchmarks(Figure 7, Figure 9, Table 2).

0.05 0.10 0.15 0.20 0.25 0.30 0.35 0.40

CPU Load Average

20

30

40

50

60

70

80

90

Bandw

idth

Uti

lizati

on (

%)

t = 1 mins

t = 10 mins

t = 25 mins

t = 70 mins

Fig. 10. Resource utilization of machines at different points in time afterthe dormant computations are activated.

TABLE 3Resource utilization of individual machines over time since the

activation of dormant computations.

Time ElapsedCPU Load Average Bandwidth Utilization (%)Mean Std. Dev. Mean Std. Dev.

1 min 0.1727 0.1023 47.3592 25.561010 mins 0.2012 0.0805 53.3550 16.906425 mins 0.2919 0.0082 77.4808 0.690270 mins 0.2836 0.0098 77.6729 0.0642

5.5 External Interference: Alleviating Hotspots

The objective of this benchmark is to evaluate the effec-tiveness of our online scheduling algorithm when a sub-set of nodes are affected by external interference. Eachnode is allocated the same number of stream processingcomputations, and 33.3% of the nodes were subjected toexternal interference. External interference was simulatedusing a separate process that generated significant CPUand network bandwidth pressure. Similar to the previousbenchmarks cumulative throughput, 99th percentile andstandard deviation of the observed latencies were used asthe evaluation metrics. As the near-ideal performance, thesame number of computations were executed on a clus-ter of equal size without any external interference. Onlinescheduling algorithm was disabled when measuring thenear-ideal performance. Thorax and ECG processing com-putations with fixed rate streams were used; but a fixedvariability was introduced to the message rate as it closelysimulates most real world streams.

Figure 11 plots the variation of the three metrics overtime. By migrating computations away from the nodeswith external interference, our algorithm is able to recover77.25% of the lost throughput due to external interference.Computations migrated away from nodes with external

0 5 10 15 2030

31

32

33

34

35

Thro

ughput

(Msg

s/s

in M

illio

ns)

With Prediction Rings Under External Interference

Without Prediction Rings Under External Interference

No External Interference (Best Achievable Performance)

0 5 10 15 200

20

40

60

80

100

120

140

Late

ncy

- 9

9th

Perc

. (m

s)

0 5 10 15 200

5

10

15

20

25

30

Late

ncy

- S

td.

Dev.

(ms)

Time Since Start (Minutes)

Fig. 11. Variation of cumulative Throughput, 99th percentile of latencyand standard deviation of latency of the cluster over time when 33.3%of nodes in cluster is subjected to external interference.

Page 13: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

13

TABLE 4Summary of performance improvements provided by our online

scheduling under extenal interference. For throughput, positive isbetter whereas for latency metrics, negative is desirable.

Metric Deviation from Ini-tial Perf.

Deviation fromNear-Ideal Perf.

Throughput +1.300% -0.376%Latency - 99th perc. -85.0041% +8.2079%Latency - Std. Dev. -82.8989% +19.2485%

interference benefit from less contention for network band-width, which is the main reason behind the throughput im-provement. We could observe significant improvements inlatency related metrics with our scheduling algorithm. Boththe 99th percentile and the standard deviation showed animprovement of over 82%. This is mainly due to reducedwaiting times experienced by computations for their CPUand network bandwidth shares at nodes with less inter-ference after the migration. The improved performance isstill slightly less than the maximum achievable performance(setup without any external interference) because regardlessof migrating computations away from the nodes affectedby external interference, system resources are still sharedbetween two groups of processes— Neptune nodes andinterfering processes. Hence it is not possible to completelyrecover from performance degradation. A summary of per-formance improvement compared to the initial setting andthe near-ideal setting is available in Table 4.

5.6 Evaluating the Stability of the System

Measures taken to maintain system stability by ensuringthat only the migrations yielding significant improvementsare allowed are discussed in § 4.2.1. We evaluated theeffectiveness of these measures using the variable streamrate benchmark.

Dynamically adjusting the threshold for the expectedreduction of interference (∆I) is one measure to ensuresystem stability. Figure 12 shows how the threshold is dy-namically adjusted based on resource utilization imbalanceswithin the cluster as indirectly captured by the variancein the interference scores for the impacted computationat nodes. Figure 13 shows how the number of migrationscompleted in successive, non-overlapping two-minute in-tervals is changing over time. As seen in Figure 12, thevariance in interference scores recorded against individualnodes decreases gradually over time. This is indicative ofthe alleviation of resource utilization imbalances that werepresent at the beginning. There is also a decrease in thenumber of migrations over time (as shown in Figure 13)mainly due to the adjustment of the threshold, the decidingfactor for initiating a migration, to higher values at laterstages. Our benchmarks demonstrate that our algorithmencourages aggressive migrations when there is a significantimbalance in the resource utilization among nodes untilthe resource utilization in the cluster reaches a reasonablyconsistent state. Also, in the case of computations withvariable rate streams, there will be continuous attemptsfor migrations. This is because of the different degrees of

0 10 20 30 40 50 60 70

Time Since Activating Dormant Computations (Minutes)

0.0

0.5

1.0

1.5

2.0

2.5

3.0

Inte

rfere

nce

Sco

re -

Std

. D

ev/M

ean

0.0

0.1

0.2

0.3

0.4

0.5

Mig

rati

on T

hre

shold

Interference Score - Std.Dev/Mean Migration Threshold

Fig. 12. Dynamically adjusted threshold of expected reduction of inter-ference (∆I) for triggering migrations over time.

interference expected by computations due to the variableinput rates, even though the amount of work performed permessage is similar. This can be seen in Figure 13 from therelatively low number of migrations taking place after theinitial aggressive scheduling period. In this benchmark, theaverage completion time for a migration is 54.68 ms (std.dev. = 65.70 ms). The time required to complete a migrationis dominated by the state transference phase and the backlogclearance phase of the current computation tasks.

5.7 Profiling the Runtime Overhead for Online Schedul-ingRunning our prediction rings based online scheduling algo-rithm incurs additional processing and memory overheads.Periodic execution of prediction ring updates, interferencescore calculation, and maintaining the prediction ring datastructures in memory are the primary contributing factorsto this additional overhead. We monitored the memory con-sumption and CPU utilization of nodes when our schedul-ing algorithm was running and contrasted it with regularNeptune operations with the online scheduling algorithmdisabled. An equal number of computations were placedwith fixed input rates on each node in both cases, and theirCPU and memory consumption was measured periodically.We have calculated the mean CPU and memory utiliza-tion per node using these metrics. In order to maintain afixed number of computations at a node when the onlinescheduling algorithm is running, we disabled migrationtriggers at the orchestration manager. Node managers were

0 10 20 30 40 50 60

Time Since Activating Dormant Computations (Minutes)

0

10

20

30

40

50

60

Num

ber

of

Mig

rati

ons

Per

2 M

inute

Peri

ods

No. of Migrations Cumulative Throughput

0

10

20

30

40

50

60

Thro

ughput

(Mess

ages/

s in

Mill

ions)

Fig. 13. Number of migrations (over a window of 2 mins) over time.

Page 14: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

14

Cluster Nodes0.300

0.305

0.310

0.315

0.320

0.325

0.330C

PU

Usa

ge %

Cluster Nodes0.10

0.15

0.20

0.25

0.30

0.35

Mem

Usa

ge (

GB

)

With Prediction Rings Without Prediction Rings

Fig. 14. CPU and memory overhead of running prediction ring algorithmand maintaining relevant data structures.

still executing their periodic tasks of updating predictionrings, calculating interference scores, and sending periodicstatus updates to the orchestration manager. So this bench-mark does not capture the additional resource utilizationcaused when a migration is triggered that includes: mainlyprocessing of a few additional control messages and seri-alization/deserialization overhead when transferring, andrestoring the state of the migrated computation. We positthat this is still a valid comparison because it captures theprocessing and memory overheads caused by all periodicmonitoring and reporting operations.

Figure 14 shows the average CPU and memory utiliza-tion at each node with and without the online schedulingalgorithm. We observed a high standard error in the mem-ory utilization readings due to periodic garbage collectioncycles. Single tail two sample t-tests were performed tocheck if our online scheduling algorithm caused a significantoverhead. CPU utilization has increased slightly due to theonline scheduling algorithm (p-value = 0.03896, α = 0.05)and there was no significant evidence to suggest that thememory utilization has increased (p-value = 0.08924, α =0.05). There was approximately a 0.33% increase in CPUutilization, which we believe is acceptable given that it didnot disrupt the regular execution of computations. Further,this benchmark substantiates our claim: prediction ringsare lightweight and do not introduce significant memorypressure.

6 RELATED WORK

Current state of the art stream processing systems supportdifferent scheduler implementations that are used for theinitial placement of tasks in the cluster. Besides allowingusers to implement custom schedulers, Apache Storm [15]includes a set of built-in schedulers such as the Even-Scheduler, IsolationScheduler and ResourceAwareSched-uler [16]. EvenScheduler, which is the default schedulerof Apache Storm, distributes stream computation tasksacross cluster nodes in a round-robin manner. With theResourceAwareScheduler which was intially implementedon top of R-Storm [11], the initial scheduling plan is derivedbased on the CPU, memory and network requirements ofeach Storm Spout and Bolt and resource availability ofnodes as manually set by the user. In R-Storm, memory isconsidered a hard constraint that is always fully satisfied,whereas CPU and network are considered soft constraints

which may not always be fully satisfied. These requirementsare matched with the resource availability of each nodeusing a Euclidean distance function; also tasks that commu-nicate with each other are attempted to be scheduled withminimum network distance. Apache Flink [33] attemptsto collocate tasks on a execution pipeline (a sequence oftasks through which the data flows) to a single slot ina task manager (equivalent to a Neptune node). SparkStreaming [34], being a micro-batch based stream processingsystem follows a different task scheduling scheme from thecontinuous operator systems mentioned earlier. Instead ofallocating a task to a node for the lifetime of the streamprocessing job (assuming no dynamic scheduling), in SparkStreaming tasks are short-lived and are allocated at thebeginning of each interval to calculate output RDDs forthat interval. During this task allocation, it attempts topreserve the data locality, assign adjacent operators to asingle task, and avoid shuffling data across the network.Xing et al. [14] proposes an initial placement algorithm foroperators that is resistant to short-term load fluctuations.The expected load at each operator is modeled as a linearfunction of stream input rates and selectivity which is thenused for operator distribution in the cluster based on twoheuristics: equal load distribution and avoiding creation ofbottlenecks when multiple operators are collocated. In fact,this algorithm is complementary with dynamic schedulingalgorithms like ours as the authors suggest because togetherthey can withstand short, medium and long-term load fluc-tuations. In such a setup, the initial placement is derivedusing a static algorithm, and the stream processing runtimewill automatically switch to an online scheduling algorithm(such as ours) during runtime.

In continuous query evaluation over streams, roundrobin, chain [35], adaptive broadcast disk scheduling [36],highest normalized rate [37], and query class scheduling [38]are commonly used techniques for scheduling operatorsfor execution within a single machine. Scheduling of newstream processing jobs in a setup where jobs are continu-ously arriving and departing is discussed in [10], [13], [39].SODA [10] is an epoch scheduler designed for System S. Itensures the processing elements executing at a given timereceive an adequate fraction of resources when the numberof jobs in the system constantly varies assuming that thesystem is overloaded most of the time. This is achieved byaccepting only a subset of jobs that are being submitted,selecting a data flow plan, and placing processing elementsof the new jobs based on an objective function that optimizesthe utilization of processing power and network interfacecards of nodes and network links in the system. Unlike ourstudy, this work focuses on initial scheduling of processingelements of new queries based on the current state of thesystem. It also rejects the jobs that it cannot accommodate.

Imai et al. [17] have used actors as the unit of workloadmigration to implement application-level workload migra-tion. Virtual machines with less resource utilization willcontinuously trigger actor migrations through work-stealrequests. Our system differs from this work mainly due toits proactive approach to trigger migrations. Further, theysupport dynamic sizing of resource pools via opportunisticVM creation and termination. Our approach assumes afixed size resource pool but can be extended to support

Page 15: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

15

dynamic scaling. FUGU [40] employs a rebalancing schemeto maximize resource utilization by migrating operatorsbetween hosts upon addition or removal of queries in acomplex event processing setting. FUGU migrates operatorsfrom underutilized hosts and terminates those hosts inorder to improve overall resource utilization whereas in oursystem, computations are migrated in order to alleviate anyresource imbalances in the system. In FUGU, rebalancingis triggered when queries are removed or a new host isspawned as a result of adding a new query. In our system,we continually attempt to alleviate resource imbalancescaused not only due to a change in the number of concurrentstream processing jobs (equivalent to number of queriesin a complex event processing system), but also due tofluctuations in both the workload and in system conditions.There has been recent work on developing network-trafficaware continuous scheduling schemes [12], [41], [42], [43]for Apache Storm [15] in order to reduce the latency byreducing the amount of network communication in a givenstream processing application. Computations that commu-nicate the most (hot edges) in a stream processing graphare identified by monitoring the communication betweeneach computation pair, and there are continuous attempts tomigrate such pairs into the same process or two processesrunning within the same physical node. They also ensurethat the worker processes are not overloaded by taking intoaccount the performance requirements of the computationsidentified through continuous profiling. T-Storm [41] goesone step further by trying to consolidate worker processesin order to reduce the number of worker processes re-quired to run a given workload. Chatzistergiou et. al [42]improves the above task allocating approach by attemptingto collocate the majority of the tasks of two communicatingcomputations (called groups). Fischer et al. [43] uses agraph partitioning algorithm, METIS [44], to partition thequery processing graph to reduce the number of messagescommunicated through the network. These approaches arereactive and focus on improving the latency, whereas ourapproach is proactive, relying on time-series analysis, andimproves both throughput and latency.

7 CONCLUSIONS & FUTURE WORK

Our methodology for online stream scheduling encom-passes algorithms and data structures that have a low mem-ory and processing footprint when they reside in the criticalpath of processing streams. We treat stream processing com-putations as black boxes; we do not perform code inspectionand rely only on externally observable features relating tostream arrival patterns and resource usage footprints toinform decisions. We now revisit our research questions.RQ-1: Our online stream scheduling accounts for packetarrival rates at computations, the accompanying resourcefootprints, and the strains they place on machines. Theprediction ring data structure and online scheduling al-gorithm allows us to account for packet arrival rates onstreams at millisecond resolutions. Our use of exponentialsmoothing to perform short-term, time-series analysis ofarrival patterns ensures memory compactness and fast eval-uations without compromising on the accuracy needed tocircumvent interference.

RQ-2: Prediction rings can proactively identify internalinterference by tracking packet arrivals at both fine andcoarse-grained scales. Tracking changes in resource uti-lization at a machine allows us to account for externalinterference from collocated processes. Alleviating externalinterference allows us to cope with situations wherein thequality of a machine degrades over time either due tocollocated resource-intensive processes or maintenance. Pre-diction rings are amenable to aggregation (for all computa-tions on a machine) and pair-wise comparisons to computeinterference scores. Together, prediction rings and interfer-ence scores, allow identifying computations that are mostimpacted by interference and also the machines that arebest suited to host them. Our empirical benchmarks demon-strates an 84% improvement in throughput by alleviatinginternal interference while improving latency related met-rics by more than 82% by alleviating external interference.RQ-3: The system targets resource utilization imbalancesallowing the processing load to be amortized over a col-lection of machines. Continuous, incremental, and targetedmigration of interference impacted (and low performing)computations allows processing to be effectively dispersedover a collection of machines leading to reduced resourceimbalance. Since resource imbalances are alleviated, it isless likely that queues and processing hotspots to emerge –improving throughputs and reducing per-packet delays andthe corresponding variance in these delays. The methodol-ogy also incorporates safeguards to damp oscillatory behav-ior where computations are continually migrated. Only themost impacted computations are migrated and that too onlyto those machines that have the necessary resource slack andwhere they do not introduce increased internal interferencefor existing computations.

In our future work, we will explore using reinforcementlearning to inform our online stream scheduling. This willinvolve cost assignments including rewards for migrationdecisions that improve throughput and alleviate utilizationimbalances while penalizing those that degrade perfor-mance.

Acknowledgements: This research is supported by the NSFComputer Systems Research Program (CNS-1253908).

REFERENCES

[1] M. R. Garey, R. L. Graham et al., “Resource constrained schedul-ing as generalized bin packing,” Series A Journal of CombinatorialTheory, vol. 21, no. 3, pp. 257–298, 1976.

[2] D. Fernandez-Baca, “Allocating modules to processors in a dis-tributed system,” IEEE Transactions on Software Engineering, vol. 15,no. 11, pp. 1427–1436, 1989.

[3] M. Wall, “A genetic algorithm for resource-constrained schedul-ing,” Thesis, 1996.

[4] J.-K. Kim, H. J. Siegel et al., “Dynamic resource managementin energy constrained heterogeneous computing systems usingvoltage scaling,” IEEE Transactions Parallel and Distributed Systems,vol. 19, no. 11, pp. 1445–1457, 2008.

[5] A. V. Dastjerdi and R. Buyya, “Fog computing: Helping theinternet of things realize its potential,” Computer, vol. 49, no. 8,pp. 112–116, 2016.

[6] J. Gubbi, R. Buyya et al., “Internet of things (iot): A vision,architectural elements, and future directions,” Future generationcomputer systems, vol. 29, no. 7, pp. 1645–1660, 2013.

[7] J. Dean and L. A. Barroso, “The tail at scale,” Communications of theACM, vol. 56, no. 2, pp. 74–80, 2013.

Page 16: Granules: Scaleable, distributed stream processing - 1 ...granules.cs.colostate.edu/papers/OnlineDataStream...1 Online Scheduling and Interference Alleviation for Low-latency, High-throughput

16

[8] T. Buddhika and S. Pallickara, “Neptune: Real time stream pro-cessing for internet of things and sensing environments,” in IEEEIntl. Parallel & Distributed Processing Symposium (IPDPS), 2016, pp.1143–1152.

[9] M. Hirzel, R. Soule et al., “A catalog of stream processing opti-mizations,” ACM Computing Surveys, vol. 46, no. 4, p. 46, 2014.

[10] J. Wolf et al., “Soda: an optimizing scheduler for large-scale stream-based distributed computer systems,” in ACM/IFIP/USENIX In-ternational Conference on Distributed Systems Platforms and OpenDistributed Processing. Springer, 2008, pp. 306–325.

[11] B. Peng, M. Hosseini et al., “R-storm: Resource-aware schedulingin storm,” in Proc. of the ACM Middleware Conference, 2015, pp. 149–161.

[12] L. Aniello, R. Baldoni et al., “Adaptive online scheduling instorm,” in Proc. of the ACM DEBS, 2013, pp. 207–218.

[13] J. Ghaderi, S. Shakkottai et al., “Scheduling storms and streams inthe cloud,” in ACM SIGMETRICS Performance Evaluation Review,vol. 43, no. 1, 2015, pp. 439–440.

[14] Y. Xing et al., “Providing resiliency to load variations in distributedstream processing,” in Proc. of the 32nd international conference onVery large data bases. VLDB Endowment, 2006, pp. 775–786.

[15] “Apache Storm,” https://storm.apache.org, 2015.[16] “Apache Storm - Scheduler,” http://storm.apache.org/releases/

1.0.1/Storm-Scheduler.html, 2015.[17] S. Imai, T. Chestna et al., “Elastic scalable cloud computing using

application-level migration,” in Utility and Cloud Computing (UCC),2012 IEEE Fifth International Conference on. IEEE, 2012, pp. 91–98.

[18] P. Hunt, M. Konar et al., “Zookeeper: Wait-free coordination forinternet-scale systems.” in USENIX Annual Technical Conference,vol. 8, 2010, p. 9.

[19] K. E. Maghraoui, T. J. Desell et al., “The internet operating system:Middleware for adaptive distributed computing,” Journal of HighPerformance Computing Applications, vol. 20 (4), pp. 467–480, 2006.

[20] R. D. Blumofe and C. E. Leiserson, “Scheduling multithreadedcomputations by work stealing,” Journal of the ACM, vol. 46 (5),pp. 720–748, 1999.

[21] T. Lorido-Botran, J. Miguel-Alonso et al., “Auto-scaling techniquesfor elastic applications in cloud environments,” University ofBasque Country, Tech. Rep. EHU-KAT-IK-09, vol. 12, p. 2012, 2012.

[22] NIST, “Engineering Statistics Handbook - Triple ExponentialSmoothing,” http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm.

[23] N. Wagner et al., “Time series forecasting for dynamic environ-ments: the dyfor genetic program model,” IEEE transactions onevolutionary computation, vol. 11, no. 4, pp. 433–452, 2007.

[24] G. E. Box, G. M. Jenkins et al., Time series analysis: forecasting andcontrol. John Wiley & Sons, 2015.

[25] G. A. Agha, “Actors: A model of concurrent computation indistributed systems.” DTIC Document, Tech. Rep., 1985.

[26] C. A. Varela and G. Agha, Programming Distributed ComputingSystems: A Foundational Approach, 2013.

[27] J. Rittweger, “physiodata,” 2000.[28] M. Saeed, M. Villarroel et al., “Multiparameter intelligent moni-

toring in intensive care ii (mimic-ii): a public-access intensive careunit database,” Critical care medicine, vol. 39, no. 5, p. 952, 2011.

[29] A. L. Goldberger et al., “Physiobank, physiotoolkit, and physionetcomponents of a new research resource for complex physiologicsignals,” Circulation, vol. 101, no. 23, pp. e215–e220, 2000.

[30] J. Pan and W. J. Tompkins, “A real-time qrs detection algorithm,”Biomedical Engineering, IEEE Transactions on, no. 3, pp. 230–236,1985.

[31] R. Castro Fernandez, M. Migliavacca et al., “Integrating scaleout and fault tolerance in stream processing using operator statemanagement,” in Proc. of the ACM SIGMOD, 2013, pp. 725–736.

[32] F. Al-Haidari, M. Sqalli et al., “Impact of cpu utilization thresholdsand scaling size on autoscaling cloud resources,” in Cloud Comput-ing Technology and Science (CloudCom), 2013 IEEE 5th InternationalConference on, vol. 2. IEEE, 2013, pp. 256–261.

[33] “Apache Flink,” https://flink.apache.org/index.html, 2015.[34] M. Zaharia, T. Das et al., “Discretized streams: an efficient and

fault-tolerant model for stream processing on large clusters,” inProc. of the 4th USENIX HotCloud, 2012.

[35] D. Carney et al., “Operator scheduling in a data stream manager,”in Proc. of the 29th international conference on Very large data bases-Volume 29. VLDB Endowment, 2003, pp. 838–849.

[36] L. Al Moakar et al., “Adaptive class-based scheduling of continu-ous queries,” in Data Engineering Workshops (ICDEW), 2012 IEEE28th International Conference on. IEEE, 2012, pp. 289–294.

[37] M. A. Sharaf, P. K. Chrysanthis et al., “Efficient scheduling of het-erogeneous continuous queries,” in Proc. of the 32nd internationalconference on Very large data bases. VLDB Endowment, 2006, pp.511–522.

[38] L. A. Moakar et al., “Class-based continuous query scheduling fordata streams,” in Proc. of the Sixth International Workshop on DataManagement for Sensor Networks. ACM, 2009, p. 9.

[39] Z. Han, R. Chu et al., “Elastic allocator: An adaptive task schedulerfor streaming query in the cloud,” in IEEE Service Oriented SystemEngineering (SOSE), Intl. Symposium on. IEEE, 2014, pp. 284–289.

[40] T. Heinze, Y. Ji et al., “Elastic complex event processing undervarying query load.” in BD3@ VLDB. Citeseer, 2013, pp. 25–30.

[41] J. Xu, Z. Chen et al., “T-storm: Traffic-aware online scheduling instorm,” in ICDCS. IEEE, 2014, pp. 535–544.

[42] A. Chatzistergiou and S. D. Viglas, “Fast heuristics for near-optimal task allocation in data stream processing over clusters,”in CIKM. ACM, 2014, pp. 1579–1588.

[43] L. Fischer, T. Scharrenbach et al., “Network-aware workloadscheduling for scalable linked data stream processing,” in Proc. ofthe 2013 Intl. Conference on Posters & Demos Track-Vol:1035. ceur-ws.org, pp. 281–284.

[44] G. Karypis and V. Kumar, “A fast and high quality multilevelscheme for partitioning irregular graphs,” SIAM Journal on scien-tific Computing, vol. 20, no. 1, pp. 359–392, 1998.

Thilina Buddhika Thilina Buddhika is a Ph.D.candidate in the Computer Science depart-ment at Colorado State University. His re-search interests are in the area of real time,high throughput stream processing specificallytargeted to environments such as Internet ofThings (IoT) and health care applications. Email:[email protected]

Ryan Stern Ryan is a Ph.D. candidate inthe Computer Science department at Col-orado State University. His research inter-ests are in the area of visual analytics withan emphasis on generating real-time viewsof voluminous datasets. This involves copingwith issue such as representativeness, mem-ory residency, and page-fault avoidance. Email:[email protected]

Kira Lindburg Kira Lindburg is a software en-gineer at Hewlett Packard Enterprise. Her re-search interests are in the area of enterprisestorage systems with an emphasis on fault tol-erance. Email: [email protected]

Kathleen Ericson Kathleen Ericson is an As-sistant Professor of Computer Science at theUniversity of Tennessee Martin. Her researchinterests are broadly in the area of DistributedSystems and Machine Learning. Kathleen re-ceived her B.S degree from Drexel Universityand her M.S. and Ph.D. degrees in ComputerScience from Colorado State University. Email:[email protected]

Shrideep Pallickara Shrideep Pallickara is anAssociate Professor in the Department of Com-puter Science and a Monfort Professor at Col-orado State University. His research interestsare in the area of large-scale distributed sys-tems, specifically cloud computing and stream-ing. He received his Masters and Ph.D. de-grees from Syracuse University. He is a re-cipient of an NSF CAREER award. Email:[email protected]