a framework for the dynamic evolution of highly-available … · 2015-09-14 · proach by...

12
A Framework for the Dynamic Evolution of Highly-Available Dataflow Programs Sebastian Ertel Systems Engineering Group Technische Universität Dresden, Germany [email protected] Pascal Felber Institut d’informatique Université de Neuchâtel, Switzerland [email protected] ABSTRACT Many distributed applications deployed on the Internet must operate continuously with no noticeable interruption of ser- vice. Such 24/7 availability requirements make the main- tenance of these application difficult because fixing bugs or adding new functionality necessitates the online replacement of the software version by the new one, i.e., a“live update”. Support for “live update” is therefore essential to allow soft- ware evolution of critical services. While the problem of live update has been widely studied and several techniques have been proposed (e.g., using group communication and replication), we propose in this paper an original approach for the dataflow-based programming model (FBP). An in- teresting property of FBP is its seamless support for multi- and many-core architectures, which have become the norm in recent generation of servers and Cloud infrastructures. We introduce a framework and new algorithms for imple- menting coordinated non-blocking updates, which do not only support the replacement of individual software com- ponents, but also modifications of structural aspects of the applications independently of the underlying execution in- frastructure. These algorithms allow us to transparently orchestrate live updates without halting the executing pro- gram. We illustrate and evaluate our approach on a web server application. We present experimental evidence that our live update algorithms are scalable and have negligible impact on availability and performance. Categories and Subject Descriptors D.3.2 [Programming Languages]: Programming—Data- flow languages ; D.3.3 [Programming Languages]: Lan- guage Constructs and Features—Frameworks 1. INTRODUCTION Context and Motivations. The recent evolutions in micro- processor design [6] have brought increasing numbers of cores into modern computers. While this provides higher com- puting power to applications, it also introduces several chal- Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. Middleware ’14, December 08 - 12 2014, Bordeaux, France Copyright 2014 ACM 978-1-4503-2785-5/14/12 ...$15.00. 0 20 40 60 80 100 120 140 160 180 Time (s) 0 20 40 60 80 100 120 140 Latency (ms) misconfigured cache fixed cache config graceful restart period Figure 1: Graceful restart of a Jetty web server with the default configuration and 38 concurrent clients requesting one out of 10000 files of 20 kB size randomly every 20 ms. lenges related to concurrent programming. First, we need the right tools to develop concurrent applications that are both correct and efficient, i.e., can exploit the parallel com- puting capacities of the cores. Second, the added complexity of concurrent programs make debugging, testing, and soft- ware evolution much harder than for sequential programs, in particular when dealing with dependable systems. In this paper, we tackle the problem of software evolution for applications that have availability requirements and must remain operational on a 24/7 basis. This is an important challenge because on the one hand multi-cores provide us with additional resources to scale up, e.g., by dedicating additional cores for concurrently servicing more clients; on the other hand, when an application must be updated to a new version, orchestrating the restart of all the components running on multiple cores and/or nodes is very challenging if service must not be interrupted. Therefore the “live update” problem can be studied from a new perspective in the light of modern multi-/many-core and Cloud architectures. To illustrate the impact of software reconfiguration on availability, we have experimented with Jetty’s web server, 1 a widely used application designed for servicing thousands of requests per second in highly concurrent settings. For every reconfiguration or activation of a new feature, Jetty needs to be gracefully shut down and restarted (new con- nection requests are dropped but requests in progress are allowed to finish). Restarts result in loss of computational state and persistent connections, hence penalizing request latency and throughput, and ultimately leading to customer dissatisfaction. Figure 1 shows a scenario in which the de- fault configuration of Jetty’s cache size (2,048 files) leads to increasing delays for clients. This issue could be resolved by increasing the cache size (10,000 files), but resulted in a 1 http://www.eclipse.org/jetty/

Upload: others

Post on 14-Jul-2020

0 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

A Framework for the Dynamic Evolutionof Highly-Available Dataflow Programs

Sebastian ErtelSystems Engineering Group

Technische Universität Dresden, [email protected]

Pascal FelberInstitut d’informatique

Université de Neuchâtel, [email protected]

ABSTRACTMany distributed applications deployed on the Internet mustoperate continuously with no noticeable interruption of ser-vice. Such 24/7 availability requirements make the main-tenance of these application difficult because fixing bugs oradding new functionality necessitates the online replacementof the software version by the new one, i.e., a “live update”.Support for “live update” is therefore essential to allow soft-ware evolution of critical services. While the problem oflive update has been widely studied and several techniqueshave been proposed (e.g., using group communication andreplication), we propose in this paper an original approachfor the dataflow-based programming model (FBP). An in-teresting property of FBP is its seamless support for multi-and many-core architectures, which have become the normin recent generation of servers and Cloud infrastructures.We introduce a framework and new algorithms for imple-menting coordinated non-blocking updates, which do notonly support the replacement of individual software com-ponents, but also modifications of structural aspects of theapplications independently of the underlying execution in-frastructure. These algorithms allow us to transparentlyorchestrate live updates without halting the executing pro-gram. We illustrate and evaluate our approach on a webserver application. We present experimental evidence thatour live update algorithms are scalable and have negligibleimpact on availability and performance.

Categories and Subject DescriptorsD.3.2 [Programming Languages]: Programming—Data-flow languages; D.3.3 [Programming Languages]: Lan-guage Constructs and Features—Frameworks

1. INTRODUCTIONContext and Motivations. The recent evolutions in micro-processor design [6] have brought increasing numbers of coresinto modern computers. While this provides higher com-puting power to applications, it also introduces several chal-

Permission to make digital or hard copies of all or part of this work forpersonal or classroom use is granted without fee provided that copies are notmade or distributed for profit or commercial advantage and that copies bearthis notice and the full citation on the first page. Copyrights for componentsof this work owned by others than ACM must be honored. Abstracting withcredit is permitted. To copy otherwise, or republish, to post on servers or toredistribute to lists, requires prior specific permission and/or a fee. Requestpermissions from [email protected] ’14, December 08 - 12 2014, Bordeaux, FranceCopyright 2014 ACM 978-1-4503-2785-5/14/12 ...$15.00.

0 20 40 60 80 100 120 140 160 180Time (s)

0

20

40

60

80

100

120

140

Late

ncy

(ms) misconfigured

cachefixed cache

config

graceful restart period

Figure 1: Graceful restart of a Jetty web server with thedefault configuration and 38 concurrent clients requestingone out of 10000 files of 20 kB size randomly every 20 ms.

lenges related to concurrent programming. First, we needthe right tools to develop concurrent applications that areboth correct and efficient, i.e., can exploit the parallel com-puting capacities of the cores. Second, the added complexityof concurrent programs make debugging, testing, and soft-ware evolution much harder than for sequential programs,in particular when dealing with dependable systems.

In this paper, we tackle the problem of software evolutionfor applications that have availability requirements and mustremain operational on a 24/7 basis. This is an importantchallenge because on the one hand multi-cores provide uswith additional resources to scale up, e.g., by dedicatingadditional cores for concurrently servicing more clients; onthe other hand, when an application must be updated to anew version, orchestrating the restart of all the componentsrunning on multiple cores and/or nodes is very challenging ifservice must not be interrupted. Therefore the “live update”problem can be studied from a new perspective in the lightof modern multi-/many-core and Cloud architectures.

To illustrate the impact of software reconfiguration onavailability, we have experimented with Jetty’s web server,1

a widely used application designed for servicing thousandsof requests per second in highly concurrent settings. Forevery reconfiguration or activation of a new feature, Jettyneeds to be gracefully shut down and restarted (new con-nection requests are dropped but requests in progress areallowed to finish). Restarts result in loss of computationalstate and persistent connections, hence penalizing requestlatency and throughput, and ultimately leading to customerdissatisfaction. Figure 1 shows a scenario in which the de-fault configuration of Jetty’s cache size (2,048 files) leads toincreasing delays for clients. This issue could be resolvedby increasing the cache size (10,000 files), but resulted in a

1http://www.eclipse.org/jetty/

Page 2: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

accept read parse load compose reply

loadNIO compose reply

NIOaccept read parse

NIO SWITCHfile channel

file content

expected:file channel

expected:file content

file content ≠

=

Figure 2: This simple HTTP server handles requests con-currently in a pipeline parallel fashion. To switch the serverfrom blocking I/O to non-blocking I/O (NIO), we need tochange the load to retrieve a file channel rather than the filecontent and the reply to use NIO’s sendfile primitive.

measured downtime of 8 s which is substantial consideringan average request latency of 1 ms. Hence, an execution en-vironment that allows us to update or reconfigure criticalapplication services with negligible overheads and no inter-ruption of service is highly desirable.Live Update. Early approaches to live update rely onrestarting a new instance of the program and transferringits state [14]. However, the costs of redundant hardwareand the overheads of state transfer and synchronization canbe substantial [16]. Focus has therefore turned towards thelive update of running programs in-place, coined as dynamicsoftware updates (DSU) [17]. Exchanging and altering an ex-ecuting program without breaking correctness a particularlychallenging [26]. In this paper, we use the notion of programstate consistency to reason about update correctness [10].The live update process identifies three key challenges whendealing with object-based systems as considered here: statetransfer, referential transparency, and mutual references [9].

The first step is to find a quiescent state where an updatecan be applied safely. This is especially challenging in highlyconcurrent systems where many threads2 access shared stateat any given time. Hence, before performing a state transfer,we need to identify (i) the threads accessing the shared stateto be updated and the (ii) time at which these accesses occur.We refer to these two elements of state quiescence detectionas participants and appointment respectively.

Prominent approaches for live update systems force devel-opers to introduce barriers into the program to achieve qui-escence [16, 24]. In addition to the blocking behavior (i.e.,forcing threads to idle during update) and the performancepenalty during normal execution, this approach is hard toscale to large multi-threaded programs because of their com-plexity and the risk of synchronization hazards (e.g., dead-locks) [11]. Further, while the timeliness of updates has beenidentified as a requirement [23], no live update mechanismwe are aware of is based on a solid definition of time.

Another important challenge is to preserve referential trans-parency in the process of updating all pointers to the newstate, i.e., one must still be able to consistently access statewhile the update is in progress.

Live updates become even more complicated when mutualdependencies between the components exist. Consider theexample of a simple web server handling requests concur-rently in a pipeline parallel fashion, as illustrated in Fig-ure 2. If we want to change the processing of input/out-put from blocking to non-blocking (NIO), it is not sufficientto halt only the load and reply components because data

2We use the terms of threads and processes interchangeablyto refer to concurrent execution units.

containing file content might exist in between both. Afterthe update the reply would not be able to handle theserequests.3 Even event-based approaches that support non-blocking live updates of individual components still have toblock when the update spans more than one component anddo not address the problem of undelivered messages [10].

Another desirable property for live updates is to providelocation transparency, i.e., seamlessly handle updates of com-ponents irrespective of whether they are executing locally orremotely. This is particularly important in cloud computingenvironments where servers are virtualized and services canbe dynamically relocated to other machines, e.g., for loadbalancing or fault tolerance.

Finally, most live update systems deal with small securitypatches. Support for complex updates to also enhance thesystem has gained attention only recently [24, 10]. In orderto fully enable dynamic evolution of programs, updates mustnot be limited to new features inserted at well-defined pointsbut must support structural program changes.Contributions and Roadmap. It has been shown thatstructured software design helps providing safe and flexiblelive update capabilities [11]. In light of these observations,we do not aim at bringing live update capabilities to any ex-isting programs or languages. We instead take the opportu-nity to argue for a different type of programming model [6]that can naturally solve the aforementioned challenges ofdynamic software evolution: (data)flow-based programming(FBP) [21]. In addition to providing powerful abstractionsfor multi-core programming and for implicit parallel and dis-tributed applications, we strongly believe that FBP offers apromising foundation to incorporate live updates.

In this paper we make several contributions. We first eval-uate the FBP concepts with regard to the requirements ofdynamic software evolution (Section 2). We then introducein Section 3 a live update algorithm based on Lamport’slogical time to coordinate mutual dependency updates ina non-blocking fashion. We present in Section 4 FBP ex-tensions for supporting scalable live updates that requireno programmer intervention. In Section 5, we extend ouralgorithm by enabling unrestricted live updates even to thestructure of the dataflow graph itself. We describe the imple-mentation of our approach in the Ohua dataflow frameworkin Section 6 and present a case study for the dynamic de-velopment of the web server from Figure 2 in Section 7. Wealso evaluate the efficiency, timeliness and overheads of ouralgorithm for different types of updates. We finally discussrelated work in Section 8 and conclude in Section 9.

2. FBP FOR LIVE UPDATESFBP can be found at the heart of many advanced data

processing systems today where parallel and concurrent pro-cessing is key for scalability: IBM’s data integration sys-tems [18], database engines [8, 12], data streaming [4, 27],declarative network design [20] and even web servers [30].Our Ohua dataflow framework targets a more general ap-proach by introducing FBP as a parallel programming model,similar to Cilk [3], but in a truely implicit way: The dataflowdependency graph is derived from a program written in thefunctional language Clojure. Dataflow dependency graphsare widely considered to be a sound basis for parallel compu-

3As a matter of fact, restarting the whole server would alsoinvolve discarding such state.

Page 3: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

tations and their reasoning [1]. Respectively, dataflow rep-resents a good abstraction for our live update algorithms.This section evaluates flow-based programming as a corefoundation to support live updates.

2.1 Referential Transparency by DesignIn FBP, an algorithm is described in a directed acyclic4

dataflow graph where edges are referred to as arcs and ver-tices as operators. Data travels in small packets in FIFOorder through the arcs. An operator defines one or more in-put and output ports. Each arc is connected to exactly oneinput port and one output port. An operator continuouslyretrieves data one packet at a time from its input ports andemits (intermediate) results to its output ports.

In Ohua, the dataflow graph is not explicitly created inthe program or via a visual tool as is the case in otherdataflow systems. Instead, it is derived from a functionalprogram implemented in Clojure where functions representoperators implemented in Java. The language separation ismeant to help the developer to understand the differencebetween functionality and algorithm. The algorithm of theweb server example of Figure 2 is given in Listing 1.

Listing 1: Ohua-style HTTP Server Algorithm in Clojure

1 ; classic Lisp−style2 (ohua3 (reply (compose (load (parse (read (accept ”80”))))))))45 ; or using Clojure's threading macro to improve readability

(which transforms at compile−time into the above)6 (ohua7 (−> ”80” accept read parse load compose reply))

In the classic dataflow approach [2, 7], operators are fine-grained stateless instructions. In contrast, FBP operatorsare small functional code blocks, which are allowed to keepstate. This programming model is similar to message-passingwith actors, which currently gains momentum in languagessuch as Scala [13]. FBP cleanly differentiates between func-tionality such as loading a file from disk and the web serveralgorithm: the former is used to implement the latter. FBPalgorithms make reasoning about complex programs easierby hiding the implementation details (functionality). Anoperator neither makes any assumptions nor possesses anyknowledge about its upstream (preceding) or downstream(succeeding) neighbours. Therewith, operators are context-free and highly reusable. As an example, Listing 2 showsthe implementation of the reply operator in Ohua.

Listing 2: Ohua-style Operator Implementation in Java

1 class Reply extends Operator {2 // stateless operator3 @Function Object[] reply(Socket s, String resp) {4 OutputStreamWriter writer = new5 OutputStreamWriter(s.getOutputStream());6 writer.write(resp); // send response7 writer.close();8 s.close();9 return null; }}

4While FBP does not define restrictions on the graph struc-ture, we restrict ourselves to acyclic graphs in this paper forsimplicity reasons.

compose

compose

parseread

readaccept balance-data

read

load

compose

reply

parse

parse

load

load

reply

reply

Figure 3: HTTP server handling requests in 3-way parallel.

Finally, an FBP program defines the communication be-tween operators, which at runtime translates into data de-pendencies, via arcs at compile-time rather than at runtime.This solves the problem of concurrent state accesses and al-lows for an implicit concurrent race-free program execution.Ohua’s programming model is similar to that of purely func-tional languages that provide referential transparency by de-sign, with the major difference that operators are allowedto keep state. Operators do not however share state, onlypointers in the realm of an operator can point to the statethat changes during a live update. For example, Ohua oper-ators can choose whether they want their state to be man-aged by the runtime engine or on their own. In the lattercase, Ohua requires getter and setter functionality for stateaccess. This allows us to update all operator-managed statepointers via a single function call and respectively preservesthe referential transparency property.

Note that state handling is not exclusively related to liveupdates but is also required for other tasks, e.g., checkpoint-ing to implement fault tolerance or operator migration toadapt to varying workloads. The state of a program is notonly defined over the state of the operators but also includesthe in-flight packets among the arcs. Packets stored insidethe arcs correspond to function calls in the synchronous pro-gramming model and therefore do not count towards pro-gram state, which is sensitive to live updates. Nevertheless,as seen in the NIO Switch example of Figure 2, in-flightpackets become problematic upon mutual dependencies. Wedefer this problem to the next section and assume for nowthat all arcs are drained before such an update can proceed.

2.2 Sequential Operator State AccessAlthough operators are executed concurrently, which in-

troduces implicit pipeline parallelism, concurrent executionof the same operator is not allowed in FBP. Therewith, theonly remaining case where shared state might exist is whenreplicating an operator to process packets in parallel. Fig-ure 3 shows an example of the web server graph answeringrequests in a 3-way parallel fashion. FBP does not howeveraddress data parallelism as it has no knowledge on whether apart of a flow graph can be replicated by splitting the packetflow. Therefore, this form of parallelism cannot be directlyexploited by Ohua’s execution engine. It requires either thehelp of the developer or a graph inspection algorithm. Notethat, in both cases, parallelism is introduced at the levelof the flow graph and not its runtime representation. Forinstance, Listing 3 relies on a user-defined balance macro(code omitted) to insert a balance-data operator, replicatethe code provided as the last parameter three times, andcreate data dependencies to set up the graph in Figure 3.

Listing 3: Explicit Data Parallelism via Clojure Macros

1 (ohua2 (let [accepted (accept 80)]3 (balance 3 accepted4 (−> read parse load write reply)))

Page 4: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

Although, the mapping of operator instances to threadsis dynamic, no concurrent execution of the same operatorinstance is permitted. Each operator in the graph becomesan independent instance at runtime. Respectively, operatorstate access remains strictly sequential and therewith onecan know at any point in time which thread is executingan operator and accessing its operator state, thus preciselyidentifying the participants in the state quiescence problem.

3. A TIME-BASED ALGORITHMIn order to safely apply a live update, we must first find

the right point in time when operator state is quiescent. Cer-tainly the scheduler has all execution knowledge of which op-erator is currently being executed on which thread and whichoperator is currently inactive. There are, however, sev-eral arguments against involving the scheduler. First, FBPdoes not declare any scheduler interface or other schedul-ing details. Hence, a scheduler-based solution cannot di-rectly leverage FBP abstractions and would lack generality.Second, FBP does not state how often and when an oper-ator might return control to the scheduler. As such eitherthe scheduler must forcefully interrupt operator execution ortimeliness of a live update cannot be guaranteed. Most im-portantly, the mutual dependency problem can not be solvedusing the scheduler, as already explained in the NIO Switchexample of Figure 2. Even when the scheduler blocks execu-tion of both the load and the reply, in order to make sureboth operator states are quiescent, packets in between thesetwo operators expect the old blocking I/O API. The simplesolution of draining the packets among the arcs between theload and the reply once again forces the load, the compose

and the reply to idle while applying the update. Hence, anotion of time with respect to the computation is requiredto define a solid concept of timeliness for live updates andperform non-blocking mutual dependency updates.

3.1 The Right Point in Computation TimeAnother way to reason about a dataflow graph without

violating FBP principles is in terms of a distributed sys-tem, i.e., a set of independent processes (with operators asfunctionality) communicating with each other by exchangingmessages (information packets) via FIFO channels.

Reasoning about time in a distributed system based onthe happened-before relationship of events was described inLamport’s seminal paper [19]. It provides the foundation forChandy and Lamport’s algorithm to capture a global snap-shot of a distributed system [5], which defines the notionof computation time and explains the associated reasoningabout the states of the participating processes. The centralidea is to inject a marker into the dataflow to mark a spe-cific time t in computation in a decentralized fashion. Themarker travels along the FIFO channels of the system. Onarrival at a process, the local computational state is at timet as defined by the happened-before relationship of messagearrival events. The process then captures its state and prop-agates the marker to all outgoing links. A global snapshot isconsistent with the states of all processes gathered at time t.The algorithm is solely based on the marker concept and thehappened-before relationship of message arrival events atthe processes. Further, it operates in a purely non-blockingfashion as a natural part of data processing and allows us tospecify a concrete point in computation time for operatorsto capture their state, or in our case apply an update.

Algorithm 1: Marker-based Update Algorithm

Data: operator o := (I,O) consisting of input and outputports; marker m arriving on input port i ∈ I at times ≤ t; j the joined representation of all arrivedmarkers m (with the same id)

1 if m arrived on all k ∈ I then2 if m.target = o.id then // operator update (s = t)3 oldState ← o.getState();4 updatedState ← m.updateState(oldState);5 updatedOp ← m.updateOperator(o);6 updatedOp.setState(updatedState);7 if j.dependents 6= ∅ then8 for dependency marker c ∈ j.dependents do

// coordination (enforce update consistency:s = t)

9 c.setDependent(true);10 o.broadcastInOrder(c);

11 end12 else // initialize termination13 j.setCleanup(true);14 o.broadcastOutOfOrder(j);

15 end16 else // marker propagation17 if j.isDependent() then // consistency (s = t)18 o.broadcastInOrder(j);19 else // timeliness (s < t)20 o.broadcastOutOfOrder(j);21 end

22 end23 foreach k ∈ I do k.unblock()

24 else25 if m.isCleanup() then // termination26 o.broadcastOutOfOrder(m);27 foreach k ∈ I do k.unblock();28 j ← ∅; // drop all subsequently arriving markers m

29 else // marker join30 j.dependents ← j.dependents ∪ m.dependents;31 if m.isDependent() then32 i.block();33 j.setDependent(true);34 else35 // initial marker detected, no mutual

dependency upstream

36 end37 end

38 end

3.2 Marker-based Update CoordinationOur approach to decentralized update coordination is pre-

sented in Algorithm 1. The marker propagation for mutualdependencies adheres to the principles of Lamport’s snap-shot algorithm to preserve consistency. We extend the algo-rithm to not only capture the state of an operator on markerarrival, but also update it. An update is injected as an up-date marker in the dataflow graph (see Section 4.2). Themarker contains the unique identifier of the operator to beupdated as well as the functionality to update its state andthe operator implementation. Operators propagate the up-date marker from their input to their output ports throughthe flow graph (Lines 16–22). Whenever the marker encoun-ters the target operator, the update is performed inline withthe computation (Lines 2-6) at a time t in computation.

In order to coordinate a mutual dependency update, wepiggyback in an update marker m as many markers as thereare mutually dependent downstream operators to be up-dated at the same point in time. Once m has been applied,these markers are propagated downstream at time t in the

Page 5: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

UPDATE 2-way parallel

GET

merge load condcondparse-get

parse-post store

compose-get

compose-post

replymerge

parse-post

mergebalance-data

accept read

(Initial) Independent Marker Dependent Marker Cleanup Marker

Figure 4: An extended version of our HTTP server that additionally supports UPDATE requests to the files it provides. AnUPDATE of a file shares functionality with a GET whenever possible and returns the previous version of the updated file.

computation (Lines 8–11). Hence, in order to switch ourHTTP server flow to NIO (see Figure 2), we submit onemarker for the load, which piggy backs another one for thereply operator. At time t the marker arrives at the load

operator. The update is performed and the marker targetedfor the reply is propagated downstream. On its way toits destination the remaining update marker defines a clearpoint in time at which the update of the reply is safe to beperformed. All packets sent by the load operator at an ear-lier time s < t that require the old blocking API are locateddownstream of the marker while all packets sent at a latertime u > t that require the new NIO API are upstream.Safety of the update process is guaranteed by the FIFO or-dering of packets inside the arcs and immediate propagationof arriving markers inside the operators (see Lines 17–19).Finally, the last update marker reaches the reply opera-tor at time t and performs the update. Both updates wereperformed without blocking any of the operators.

3.3 Marker JoinsUp to now, we assumed that operators only have one in-

put port. Typically, operators with more than one inputport merge or join the data arriving from the different inputports. So does our algorithm with respect to the piggy-backed markers and its type (Lines 29, 32). According toLamport’s algorithm, operators with more than one inputport have to wait for the update marker to arrive at all in-put ports before it can be propagated (Line 1). Until thispoint in time, packets from input ports that have already re-ceived the marker must not be dequeued (Lines 30–35). Notethat the algorithm only blocks input ports that received amarker with a dependency to an upstream update (Lines9, 30–33).5 Although all input ports have to see a markerbefore the propagation is safe, input ports without an up-stream update are not blocked as packets arriving on theseports are not influenced by the downstream update (Lines34–36). This seems counter-intuitive because arcs representdata dependencies and hence a downstream operator shouldalways depend on all operators upstream. We describe sucha scenario in the context of the version of our HTTP serverin Figure 4 that was enhanced to handle updates to the pro-vided resources. Similar to the semantics of data structuresin common languages, our server does not only update a filebut also returns the content of the previous version. As such,GET and UPDATE functionality share operators of the flowgraph, e.g., accept, read, load and reply. Both cond oper-ators dispatch data based on the type of the request. Since,parsing the new content for a resource update might requiremore time, we execute this step in a two-way parallel fash-

5In our algorithm, blocking does not lead to idling threadsbut rather restricts the functionality available for execution.

ion. A mutual dependency update for the functionality ofthe parse-post and store operators is injected as an inde-pendent marker. After updating the parse-post operatorsthe adjacent downstream merge performs a marker join oftwo dependent markers. In the second merge that funnelsUPDATE and GET requests to the load we encounter thesituation where the initial marker from the GET branch doesnot define an upstream dependency (Lines 33–35). In thiscase it is safe to allow further propagation of GET requestsbecause their further processing remains unaffected by thesecond part of the mutual dependency update. Finally, afterthe dependent marker has arrived at the store and finishedthe update, we send a marker to clean up existing markers inthe graph (Lines 12–15). This is required because our prop-agation algorithm has no knowledge of the structure of thedata flow graph. Therefore, propagation happens via broad-casting the markers. The cleanup marker resolves the case inour HTTP flow graph where the dependent marker arrivesfrom the GET branch at the last merge. Here the merge

must block this port as it has no knowledge of the locationof the searched target and might be required to coordinatethe arriving markers towards a defined time t. If no markerwould arrive among the input ports, the algorithm woulddeadlock. To avoid this, we propagate a cleanup marker outof band to penalize processing as little as possible. Once themerge receives this marker it unblocks all ports and signalsthat no coordination towards an update of marker m is re-quired any more (Lines 25–28). It cleans all received updateinformation and drops all future markers of type m.

3.4 DeadlocksThe arrival of a packet on an input port is controlled by

the operator algorithm, which requests packets via its inputports. Therefore, a marker can only arrive at an input portif there are no more packets in front of it, i.e., it is locatedat the head of the queue associated with the incoming arc.The introduction of blocking into marker propagation is aninvasive step that directly influences the operator algorithm.The marker join algorithm is non-deterministic by nature:it allows arbitrary dequeuing behaviour from all input portsthat have not seen the marker yet and blocks all dequeu-ing from the rest. As a result, the operator algorithm mustadapt to this aspect, yet it does not have the notion of ablocked port. To an operator, a blocked port looks like onewith no data currently available. It may therefore decide toback off and delay processing until data becomes availableinstead of querying another input port. This deterministicbehaviour can lead to deadlocks when the operator algo-rithm decides to wait for data to arrive on a blocked port.

The classic example for this type of problematic join is adeterministic merge with packets forwarded in a predefined

Page 6: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

Operator

PacketDispatch

Synchronous Call

Port HandlerPort HandlerPort Handler

Functionality

Arc

Input Port Output Port

Figure 5: Port handler extensions for FBP operators.

order. In practice, however, non-deterministic merges arefar more common when combining packets from a parallelpipeline. For the rest of this paper we assume that the exe-cution semantics of the language avoid this type of deadlock.

3.5 StateReferential transparency and state transfer during an up-

date are respectively solved by FBP and our marker-basedapproach. The challenge of mutual dependency updates isto find all state in the system that was derived from theprogram version before the update and either flush it be-fore applying the update, or update it. The marker makessure that old state is flushed from the (dependent) incom-ing arcs before the dependent operator is updated. Still,some old state may hide in any of the operators in betweenthe mutually dependent operators. The update managerhas therefore to decide whether the old state is consideredharmful to the system. This might be the case if the updatefixes a bug where state of other operators was corrupted andneeds to be corrected. In such a case all these (downstream)operators need to be included into the update. In our fu-ture work, we intend to provide more advanced solutions toremove this burden from the user applying the update.

3.6 TimelinessFinally, the moment in computation time t when an up-

date is to be applied must not match the time v when theinitial update marker m is inserted into the flow graph. Ifm travels through the graph in FIFO order with the data,it arrives at its target operator at time v. In Algorithm 1the operator is updated on arrival such that t = v. It isstraightforward to extend the algorithm to wait for a cer-tain amount of time and apply the update at a later pointt > v. Note, however, that this is possible only if the markeris independent, i.e., the current update is not dependent onan upstream update. The important insight is that FIFOmarker propagation is only required to coordinate mutualdependencies (Lines 17–21). When finding the update tar-get, we do not need to adhere to this rule and we can utilizeout of order propagation possibilities among the arcs (Line14), such as out-of-band processing among TCP connectionsif available. As a result, it is possible to deliver timely up-dates even at t < v.

4. FBP EXTENSIONSThis section describes the extensions to incorporate our

marker-based update algorithm into the FBP model in ascalable and location transparent manner.

4.1 Structured Operators and Packet DispatchThe FBP model consists of three major abstractions: arcs,

operators, and a dataflow graph. A scalable design mustpreserve low complexity in terms of operator implementa-tion and algorithm construction. Existing FBP systems fo-cus primarily on the actual functionality. We reach beyond

by structuring an operator into operator functionality (theoperator code), port handlers, and a packet dispatch mech-anism, as depicted in Figure 5. While the operator func-tionality is provided in the classical way, the port handlersand the packet dispatcher are features of the runtime sys-tem that each operator instance inherits. Port handlers aremeant to provide common functionality to all operators andtherefore treat them as black boxes with respect to the im-plemented functionality. Still, the structure of an operator,i.e., its input and output ports, is known to a port handler.

We also refine the notion of packets by distinguishing be-tween meta-data and data packets. Markers are examples ofthe former class, while the latter correspond to informationpackets as known from FBP. Each port handler registers forone or more types of meta-data packets. The packet dis-patcher extends packet retrieval and makes sure handlerstake responsibility for meta-data packets they have regis-tered for. In contrast, data packets are always dispatched tothe operator functionality. Since port handlers are meant asan extension of the operator functionality, they also consistof sequential context-free code blocks. The programminginterface for port handlers is illustrated in Listing 4.

Listing 4: Update port handler stub in Ohua

1 class UpdateHandler extends PortHandler {2 List<InputPort> blocked;3 void arrived(InputPort i, UpdateMarker m) {4 // implementation of the update algorithm5 }}

One handler is allowed to register at to multiple input ports.We further require that all calls in the realm of an operatorare synchronous and the packet dispatcher is stateless, suchthat all dequeue operations among input ports preserve theFIFO processing order of the packets in the data flow graph.

This enhanced operator structure, together with the packetclassification and automatic dispatch, provide a powerful ex-tension framework to implement not only our coordinatedupdate algorithm but also runtime features such as check-pointing or logging in a scalable and distributed manner.

4.2 Dataflow Graph TransformationsThe major strength of FBP resides in the abstraction of

the flow graph itself. In essence, the arcs of the graph definethe data dependencies between the operators at compile-time rather than at runtime. FBP does not define the imple-mentation (array, shared memory queue, TCP connection,etc.) of the arcs but only their (FIFO) semantics. Thisprovides independence from the actual execution infrastruc-ture. The clear structural decomposition of the algorithminto small context-free operators allows the runtime systemto exploit pipeline parallelism on any distributed architec-ture, e.g., multi-core, cluster, WAN, etc. Since our algo-rithm and its extensions strictly adhere to these principles,they inherit this location transparency property. Neverthe-less, the injection of the update marker into the flow graphstill requires knowledge on the current location of all sourceoperators, i.e., operators without incoming arcs.

4.2.1 Flow Graph Entrance and ExitWe address this problem via rewriting of the dataflow

graph. Just like the above extensions to the operator, thisrewriting neither influences the construction of the dataflow

Page 7: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

Graph Rewrite ExtensionsUpdate API

accept read parse load compose reply

entrancecontrol exitup-init

Advanced Initialization Feedback

Server Socket Client Socket

Figure 6: Pre-runtime rewriting of the HTTP flow graph.

graph nor the functionality of operators or even the design ofport handlers. It can be applied before runtime to the entiredataflow graph and, as such, it is independent of the actualalgorithm implemented by the structure and operators ofthe graph and leaves both unchanged.

Our rewriting, as depicted in Figure 6, wraps the flowgraph with an entrance operator and an exit operator.The former has outgoing arcs to all source operators in theflow graph while the latter is connected via incoming arcs toall operators that have no outgoing arcs, also referred to astarget operators. All arcs and operators added during rewrit-ing are valid FBP components and can be executed by thevery same runtime system. Computation however only takeplace in the original graph and all extension operators re-main silent at runtime. There is consequently no reason forthe runtime system to move these operators, e.g., in order tobalance the computation across multiple processes or nodesin a cluster. Therefore, there is no need anymore for ourupdate marker injection to identify the source operators inthe flow graph because we created a single entry point: theentrance operator. Scalability to a large number of sourceor target operators depends on the arc implementation. Forexample, in Ohua, a network arc maps to a ZeroMQ6 con-nection and a fast message broker network that seamlesslyscales to thousands of concurrent connections.

Finally, an additional control operator provides an APIto submit update requests from the external world. Theoperator translates these requests into markers and sendsthem downstream the dataflow graph.

4.2.2 Advanced TransformationsOur rewriting is not limited to the three aforementioned

operators. In Figure 6, we also added an up-init operatorthat performs offline preparation of the updates. Further-more, when the original flow graph is split and deployedon multiple nodes, each of the subflows represents a validdataflow graph by itself and is eligible for rewriting. Whilethe control operator remains on the initial node, the up-

init operator can be transposed within the subflows toachieve local initialization of the updates. Since, no ad-ditional arcs from inside the original flow graph to the localup-init exist, the update algorithm needs to be extended.When an update marker arrives at the target operator itdoes not apply the update but sets a flag in the markerand sends it downstream. The marker propagates throughthe rest of the flow graph until it reaches the exit opera-tor. Upon identifying the flag, the exit operator notifies thelocal up-init via a feedback arc. From that point on thealgorithm works as before: the up-init performs the initial-ization and forwards the marker into the subflow where it

6http://zeromq.org/

Cache Extension

accept read parse

load

cache

compose reply

merge

accept read parse load compose reply

hit

miss

Figure 7: Extension of the HTTP flow graph with a Cache.

hits the target operator (again) and applies the update.

5. STRUCTURAL UPDATESUp to this point we have addressed updates spanning one

or multiple operators, which are akin updates of small func-tions in a program. The step from live updates to dynamicdevelopment does however require the capability to updatethe program, i.e., the dataflow graph, itself. In this sectionour focus extends to structural updates to the flow graph. Itis important to stress that no new FBP features are added,as restructuring is solely based on the FBP extensions andour marker-based update algorithm. It follows that, when aport handler is executing, all input and output ports of theassociated operator are inactive and can thus be rewired.We further assume that all structural changes are validatedbefore submission by an algorithm that verifies that the up-date does not lead to violations at any of the downstreamoperator interfaces.

For example, in Figure 7 we extend our simple HTTPserver flow with a Cache operator to remove disk I/O andimprove request latency. The packets sent down the hit arcmust adhere to the same structure as packets coming fromthe Load operator. As validation of a flow graph extension isa compile-time concern, it can be performed as a check be-fore the actual structural rewriting is applied. Hence, rewrit-ing is valid if the resulting dataflow graph contains neitherunconnected arcs nor orphaned ports.

5.1 Coordinated ReconnectionsUpdate markers for operator changes carry either a com-

pletely new functionality or a property of the existing op-erator functionality to be changed. Structural updates arealso marker-based and, as such, they adhere to our notionof computation time and must not break the FBP abstrac-tions. Changes can only be applied to the local operator,i.e., a port handler may only disconnect incoming arcs ofthe local input ports.

We classify three structural change operations that alterthe flow graph structure: delete, insert, and replace. Eachof these operations is essentially composed of a series of dis-connection and reconnection steps that must be coordinatedacross the flow graph. For example, the deletion of the sub-graph oi → . . . → on, as depicted in Figure 8, involves adisconnection step for detaching the incoming arc x fromthe input port of oi and a reconnection step to reconnectit to the input port of on+1. To coordinate these steps, wedefine a reconnection marker that contains a set of recon-nections. A reconnection is a projection from one input portto another, and a set of reconnections is always bijective.

The pseudo-code for the reconnection operations is shown

Page 8: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

oioi-1 oioi-1

on+1on

1) Disconnect incoming arc x of oi

2) Propagate m downstream to on+1

3) Disconnect incoming arc y of on+1

on+1oi-1

4) Reconnect x to on+1

x

x

xOn arrival of marker m at operator oi do:

m

yOn arrival of marker m at operator on+1 do:

on+1on

oioi-1

x

y

Figure 8: Algorithm steps for deleting the subgraph fromoperator oi to operator on.

in Algorithm 2. It focuses solely on the update part, as wealready covered marker propagation in Algorithm 1 (includ-ing offline initialization of the flow graph extension). Eachreconnection starts by disconnecting the target input portfrom its incoming arc (Line 3). Disconnected arcs that arenot part of the flow graph to be replaced or deleted need tobe reconnected either to the new flow graph (Line 6) or to aninput port in the existing flow graph (Line 13). In the lattercase the arc is attached to a dependent marker and propa-gated downstream the port (Lines 8–10). Dependent recon-nections of arcs from the new flow graph are also coordinatedthrough the existing graph (Lines 16–19). Note finally thatreconnections inside the marker are order-sensitive, e.g., anoperation that disconnects the incoming arc from an inputport and reconnects it to the new flow graph must be per-formed before another operation that reconnects an arc tothat same input port.

5.2 Distributed RewritingFigure 9 illustrates another coordinated insertion, but in

a distributed context. It performs a two phase process forenhancing our web server with proxy functionality locatedat a different cluster node to load balance disk I/O. Thefirst phase deploys the proxy flow. The actual structuralrewriting happens in the second phase, where we extend theexisting flow graph to not only dispatch existing requests tothe proxy flow but also join proxied requests back into thepipeline. The structural rewriting performs the first inser-tion at the Load operator with two reconnections. After-wards, a marker delivers the pending two reconnections tothe Reply operator. All the original operators in the existingflow are preserved and both parts of the rewriting happenat the same time in computation.

The proxy rewrite also serves as a more concrete exam-ple of an extension that is deployed in a distributed fashion.Our rewriting algorithms do not make any assumptions onthe location and deployment of the flow graph to be in-serted. The deployment algorithm running underneath theFBP abstraction must however be able to cope with result-ing deployment changes. For example, in Figure 8, if opera-tors oi−1 and oi were deployed together in the same process,then arc x would map to a simple queue. If operator on+1

were located on in different process or even on a differentnode, then x would have to be converted to an IPC or TCPconnection. Our algorithm does not make any assumptionon the deployment of the flow graph to be inserted. This isa problem in the underlying implementation of the deploy-ment algorithm, which is outside the scope of this paper.

We furthermore require that reconnections at a single op-

Algorithm 2: Structural Update Algorithm

Data: operator o := (I,O) and a marker m containingreconnections R such that m.target = o.id;

1 for reconnection r ∈ R do // operator update2 if r.in ∈ I then3 arc ← r.in.disconnect();4 if r.arc = 0 then5 if r.new 6= 0 then // insert (arc to new subflow)6 r.new.connect(arc);7 else // deletion (pending reconnection in old

flow)8 r.dependent.reconnection.arc← arc;9 r.dependent.setDependent(true);

10 o.broadcastInOrder(r.dependent);

11 end12 else // finalize reconnection13 r.in.connect(r.arc);14 end

15 else // propagate dependent reconnections16 for dependency marker c ∈ m.dependents do17 c.setDependent(true);18 o.broadcastInOrder(c);19 end

20 end

21 end

erator are performed at the same time in order to keepcomputation consistent throughout the rewriting process.Therefore, operators with more than one incoming arc haveto perform a marker join before reconnections on the localinput ports are performed. This ensures that there exists atime t at which rewriting logically took place for this oper-ator. Note that a new portion of the data flow graph startsto actively participate in computation as soon as the firstarc has been reconnected, i.e., even before rewriting is fullycomplete. This allows us to perform structural changes tothe flow graph in a purely non-blocking manner.

6. PROGRAMMING MODELThe key requirement in the definition of a programming

model for live updates is certainly simplicity. It must allowthe developer to submit its updates in a context-free fash-ion without having to reason about concurrency or locality.Ohua achieves this goal by defining a single update function.We first describe the update model related to operator andmutual dependency and later focus on the algorithm.

6.1 Operators and Mutual DependenciesListing 5 shows the code to submit the NIO-switch to the

executing web server.

Listing 5: The NIO-switch realized with Ohua.

1 (update2 [com.server/reply com.server.update/reply3 (new ReplyStateTransfer)]4 [com.server/load com.server.update/load])

Note that the new code is defined in a different names-pace than the old one. This is beneficial because dynami-cally evolving a program does not implicitly mean that thereplaced code is buggy and needs to be discarded, e.g., somefunctions might be swapped with others for performance rea-sons on specific architectures. Ohua detects whether theupdate references an operator or a function. In the former

Page 9: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

Step 2: Reconfigure Server Flow

Node 1

ProxyExtension

Node 1accept read parse load compose reply

Node 2receive load compose send

accept read parse load compose reply

cond

send receivejoin

merge

Step 1: Start the Proxy Flow

Shared Memory Arc Network Arc

preserve client connection

Node 2receive load compose send

Node 1accept read parse load compose reply

Figure 9: Proxy extension of the HTTP flow graph.

case, Ohua takes care of finding all operators of the speci-fied type in the currently executing flow graph and createsthe necessary update requests. Mutual dependency updatesare composed by submitting update pairs, as shown for theI/O switch in Listing 5. Dependencies are detected auto-matically and therefore the order of operator updates canbe arbitrary. Furthermore, if state needs to be adapted tothe new version of an operator (as exemplified for the re-

ply operator in Listing 5), it is possible to specify a state

transfer object that implements a predefined interface toreturn an updated state on the basis of the old version.

6.2 Algorithm FunctionsOperators in Ohua represent functionality while the algo-

rithm is written in Clojure. A program can be composed ofmany smaller algorithms encapsulated within an algorithmfunction. In all aspects related to the algorithm descrip-tion, Ohua strictly follows the LISP programming modeland syntax of Clojure. Listing 6 shows the definition of theweb server algorithm as a function.

Listing 6: The web server defined as a function in Ohua.

1 ; clojure namespace definition2 (ns com.server)34 ; web server defined in a function5 (ohua6 (defn web-server [port]7 (−> port accept read parse load compose reply)))

Finally, the invocation in Listing 7 assigns a port for theserver to listen on.

Listing 7: Web-server invocation in Ohua.

1 ; Ohua import to use the defined web server function2 (ohua :import [com.server])34 ; configure and launch the web server5 (ohua (com.server/web-server ”80”))

To illustrate the update process, consider the scenariofrom Figure 6 of incorporating a cache to the web server.This process entails two steps. First, the developer writesan enhanced version of the web server algorithm utilizing acache (operator), as shown in Listing 8.

Listing 8: Cache-based Web Server.

1 (ns com.server.update)23 ; the web server algorithm utilizing the cache4 (ohua5 (defn web-server-with-cache [port]6 (let [res-ref (−> port accept read parse)]7 (let [res (cache res-ref)]8 (−> (if (= res nil) (load res-ref) res)9 compose reply)))

Thereafter, this updated function is for all invocations ofthe web server in the running program. Listing 9 illustratesthe submission of the update.

Listing 9: Cache Insertion Update.

1 (update2 [com.server/web-server3 com.server.update/web-server-with-cache])

This simple model relieves the developer from the burdenof specifying what exactly has changed in the algorithm andwhat insertions or deletions need to be performed in orderto incorporate the update with the executing flow graph. In-stead, Ohua replaces all subflows resembling to invocationsof the old function with the new version along the guidelinesof Algorithm 2. This function-level approach of updating thestructure of a flow graph avoids the above identified deadlockproblem for functions without additional I/O channels. Themarker join is coordinate at the non-deterministic functionentry that gathers the arguments. Additional marker joinsinside the function are then coordinate towards that entryjoin. Due to space limitations, we delay a detailed analy-sis and a thorough discussion of more advanced updates ofalgorithm functions in Ohua to future work.

6.3 Integration with Clojure ProgramsThe actual insertion of new code within a running pro-

gram is already supported by existing tools.7 Clojure pro-grams typically execute in a “read-evaluate-process-loop”(REPL), which is similar to a command line interface thatcan launch programs in another thread or even JVM process.Attaching to an executing REPL is also functionality thatis readily available8, therefore loading new operators andalgorithm functions packaged in jars is straightforward.

7. EVALUATIONIn order to evaluate our evolution framework, we imple-

mented the algorithms presented in this paper in Ohua anddynamically developed our flow graph with the NIO, cache,and proxy extensions described above (see Figures 2, 7 and 9).We followed the same methodology as other similar frame-works [22] to analyse web server performance. We deployedour Ohua web server on one cluster node, and clients were

7https://github.com/pallet/alembic8https://github.com/djpowell/liverepl

Page 10: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

30

40

50

60

70

80

90R

eque

stLa

tenc

y(m

s)

1 2 3 4 5 6 7 8

# Parallel Pipelines

8000

9000

10000

11000

12000

13000

14000

15000

Thro

ughp

ut(R

eque

sts/

s)

baseline

barrier

locks

Figure 10: Runtime overhead using locks and barriers tosimulate update points.

evenly spread across 19 other machines. The nodes in thecluster were equipped with 2 Intel Xeon E5405 CPUs with4 cores each, 8 GB of RAM, Gigabit Ethernet, and SATA-2 hard disks. Note that the following experiments are notmeant to evaluate the scalability of the web server deploy-ment but focus on the performance of the live updates.

7.1 Runtime Overhead EvaluationWe first investigate the naive approach of inserting update

points within the operators in order to perform an analysisof the runtime overhead. We used two flavours of updatepoints: barriers and locks. One should point out that nei-ther approach can handle on their own the type of updatesthat we addressed in this paper. Even for the simple mu-tual dependency update of switching the web server to useNIO, barriers would deadlock and locks would fail due tothe packet inconsistency illustrated in Figure 2.

We do not explicitly consider the runtime overhead ofOhua as the introduced port handler switch barely encom-passes a simple conditional expression at each input port,which is a vital part of the runtime engine. In order to intro-duce more parallelism into the experiment, we parallelizedthe whole web server pipeline and assigned each accept witha different server socket. 1,000 clients evenly distribute theirrequests among sockets and ask for one out of 10,000 files of512 bytes each. Update points are shared across operators ofthe same type to simulate updates on the level of functions.

Mean latency and request throughput, reported in Fig-ure 10, indicate that the overhead for introducing locks is notvisible even with 8 threads competing for a single lock. Incontrast, barrier coordination exhibits overhead for a singleand two parallel pipelines. Once the performance is cappedby the I/O operations in the system, barriers seem to in-deed have a beneficial effect on I/O coordination, which re-sults in improved latency and throughput. The deadlockproblem mentioned earlier does however render the barriersapproach infeasible. Even in our simple experiment, whichwas designed to favour barriers, we were unable to executea successful run above 5 parallel pipelines, i.e., 5 differentserver sockets, without encountering deadlocks.

7.2 Coordinated NIO SwitchFor the coordinated switch of our web server to support

the NIO API, we concurrently executed 30 clients issuing

0 10 20 30 40 50 60 70 80Time (s)

10

20

30

40

50

60

70

Late

ncy

(ms)

NIO switchrequest

Figure 11: Request latency impact during a coordinatedswitch of the Load and Reply to use NIO’s sendfile primitive.

requests with a delay of 1 ms. As previously, requests weredistributed evenly at random across a set of 10,000 files of50 kB stored on disk. Figure 11 shows request latency overthe runtime of the experiment, averaged every 100 ms. At40 s into the experiment, we performed the coordinated up-date of the Load and Reply operators. The graph shows thatrequest latency drops immediately from an unstable averageof 33 ms to a stable average of 21 ms with very few outliers.Note that there is no spike when the switch is performed.The update does not block ongoing messages and successiverequests instantaneously benefit from the switch.

7.3 Dynamic HTTP Server DevelopmentTo support our claim that our update algorithms can

sustain the evolution from simple live updates to dynamicdevelopment of highly concurrent systems, we perform thecache insertion and proxy extension in a single experiment,one after the other, on the initial version of the web serverwithout NIO. We consider 30 clients requesting files of size100 kB and another 30 clients issuing requests for files of size2 kB. All clients pause 20 ms between consecutive requests.

In this dynamic evolution scenario, updates aim at im-proving request latency as much as possible. Therefore, wefirst insert a cache over all 20,000 files 65 s into the computa-tion, which removes disk I/O overheads and reduces latencyof both request types by 16 ms. Yet, due to the pipelinedexecution model, requests for small files are penalized by re-quests for larger files. The second rewriting operation after145 s removes this penalty by inserting a proxy that dis-patches requests for small files to a proxy server located ona different cluster node. Thereafter, small requests are pro-cessed in about 19 ms while requests for larger files have anaverage latency of 42 ms.

Note the small increase in request latency for large filesby about 4 ms after the proxy insertion. This is due to thefact that we continue to use the same Reply operator forboth request types and small file now compete with largerfiles, which results in this penalty on both sides. Note thatwe could easily remove this bottleneck by dynamically in-troducing separate Reply and Send operators for small-files.

7.4 Cache Loading StrategiesAlthough the proxy insertion happens again without any

noticeable impact on request latency, this is not the case forthe cache rewriting. The increase in latency for about 15 scorresponds to the time where the new cache loads the filesinto memory. Although this happens in the up-init opera-tor, outside of the graph part performing the computation,it nevertheless competes for disk I/O with the concurrenthandling of requests. The developer needs to be aware of

Page 11: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

0 50 100 150 200Time (s)

10

20

30

40

50

60

70

80

90La

tenc

y(m

s)cache insertion

requestproxy insertion

request

I/O interference duringcache loading

request size: 100 kB

request size: 2 kB

Figure 12: Reconfiguration experiment for cache and proxy insertion into the HTTP server flow graph.

102

103

cache insertionrequest inline naive

offline naive

offline preloaded

offline Redis

iterative

45 50 55 60 65 70Time (s)

20

30

40

50

60

70

80

Late

ncy

(ms)

Figure 13: Various strategies to fill the cache during update.

potential I/O contention and plan its update accordingly.In Figure 13 we investigate different strategies to fill the

cache. For simplicity we used a single set of 30 clients re-questing a 50 kB file randomly out of 10,000 files every 20 ms.The upper plot shows that the naive approach, which doesnot use the up-init operator but inlines cache loading withrequest handling, blocks the requests and results in a down-time of almost 9 s. In the lower graph, we plot differentstrategies for loading the cache offline from request handlingin the up-init operator. We compare the naive strategy ofloading directly from disk (offline-naive) to the two alterna-tives of (1) loading the files from the Redis9 in-memory keyvalue store, and (2) having all files already preloaded intomemory. While the former approach already greatly im-proves cache loading time, the latter demonstrates that ourrewriting algorithm does not degrade performance when nocontention exists. The slight latency overhead we can ob-serve results from the heavily loaded JVM process in thepreloaded case, but we can avoid it by loading files on de-mand (iterative approach shown in the graph).

8. RELATED WORKResearch on DSU systems is often classified with respect

to their target domain: programming languages, distributedsystems, and operating systems. The more important clas-sification however is based on the programming model, es-pecially with respect to concurrency and parallelism.Disruptive Updates. Procedural and object-oriented pro-gramming models such as C and Java support parallel ex-ecution explicitly. Updates have to follow the very sameprogramming model in order to reason about the state of

9http://redis.io

the program and enforce state quiescence. Therefore, re-cent approaches introduce synchronization in the form ofupdate points [16, 24]. They either use barriers or definea more relaxed form of synchronization to remove runtimeoverhead during normal execution and avoid deadlocks. Up-date points require the update author to reason about thedifferent states of the program. The state space increaseswith the degree of concurrency in the program and becomeseven more complicated when additional update points areintroduced to increase the timeliness of updates.

The alternative strategy of automatically inserting updatepoints involves heavy static analysis and code restructuring,which again makes them scale poorly [15]. Early evaluationof Rubah, a DSU system for Java, reports 8 % overheadduring normal processing and requires updates in the rangeof seconds. This is because Rubah does not support fine-grained online updates and must perform a full heap traver-sal to find the objects to be updated. Hence the larger theheap, the longer the traversal. This is a major concern be-cause programs that rely on DSU instead of classical replica-tion approaches typically have a large amount of state. Re-cent work [11] showed that update points are often limitedto only the original version of the program and therefore theupdate process does not scale with the program evolution.

Our FBP-based algorithms do not suffer from any of theseproblems and every newly inserted operator or subflow canagain be updated on-the-fly.Cooperative Updates. The event-driven programmingmodel [10] is better suited to reason about state and providecooperative rather than disruptive updates [11]. This modelessentially facilitates the development of a distributed sys-tem with independent processes communicating via a com-mon message-passing substrate. Updates of one or multipleprocesses are coordinated by an update manager. The ben-efit is that single process updates are performed withoutblocking the other processes. In contrast, updates spanningmultiple processes reach state quiescence by blocking all af-fected processes. Event-driven programming uses a process-ing loop similar to the operators in FBP. The model does notaddress, however, the nature of the IPC and the consistencyproblem of packets in transit during a coordinated update.The only work we are aware of that addresses this issue de-lays processing to drain packets between the two componentsuntil the whole update can be applied atomically [29].

Although less relevant to our work, one can finally men-tion, in the area of data streaming, the so-called teleportmessages used to coordinate reconfiguration of parallel oper-ators [28]. In software defined networks, consistent updatesof switch configurations are coordinated based on taggedpackets to achieve consistency [25].

Page 12: A Framework for the Dynamic Evolution of Highly-Available … · 2015-09-14 · proach by introducing FBP as a parallel programming model, similar to Cilk [3], but in a truely implicit

We enhance the work on live updates by defining the con-cept of marker joins, which is vital for the definition of asolid notion of time in the program execution, and introducea scalable language-based approach that applies mutual de-pendency updates in a non-blocking and infrastructure in-dependent algorithm.

9. CONCLUSION AND FUTURE WORKIn this paper, we extended the dataflow-based program-

ming model [21] to support live updates as well as the dy-namic evolution of highly concurrent programs during run-time. We rely on a well-defined notion of time to reasonabout update consistency in the live update process andto perform structural modifications to the flow graph in anon-blocking fashion, even with mutual dependencies acrossmany program parts. Since we based our extensions solelyon the abstractions of the flow-based programming model,our algorithms work independently of the underlying archi-tectures, e.g., multi-core systems or clusters. We concludethat FBP is appealing as a programming abstraction notonly because it helps develop scalable programs for mod-ern multi-core systems, but also because it is instrumentalin solving many of the problems encountered with explicitparallelism at the language level.

10. REFERENCES[1] U. A. Acar, A. Chargueraud, and M. Rainey. Oracle

scheduling: Controlling granularity in implicitlyparallel languages. OOPSLA ’11, New York, NY,USA, 2011. ACM.

[2] Arvind and D. E. Culler. Annual review of computerscience vol. 1, 1986. chapter Dataflow architectures.Annual Reviews Inc., Palo Alto, CA, USA, 1986.

[3] R. D. Blumofe, C. F. Joerg, B. C. Kuszmaul, C. E.Leiserson, K. H. Randall, and Y. Zhou. Cilk: Anefficient multithreaded runtime system. PPOPP ’95,New York, NY, USA, 1995. ACM.

[4] S. Chandrasekaran, O. Cooper, A. Deshpande, M. J.Franklin, J. M. Hellerstein, W. Hong,S. Krishnamurthy, S. R. Madden, F. Reiss, and M. A.Shah. Telegraphcq: continuous dataflow processing.SIGMOD. ACM, 2003.

[5] K. M. Chandy and L. Lamport. Distributedsnapshots: determining global states of distributedsystems. ACM Trans. Comput. Syst., 1985.

[6] A. A. Chien. Pervasive parallel computing: An historicopportunity for innovation in programming andarchitecture. PPoPP ’07, New York, NY, USA, 2007.ACM.

[7] J. B. Dennis. Data flow supercomputers. Computer,13(11):48–56, Nov. 1980.

[8] D. J. DeWitt, R. H. Gerber, G. Graefe, M. L. Heytens,K. B. Kumar, and M. Muralikrishna. Gamma - a highperformance dataflow database machine. VLDB ’86.

[9] N. Feng, G. Ao, T. White, and B. Pagurek. Dynamicevolution of network management software bysoftware hot-swapping. In Integrated NetworkManagement Proceedings, 2001 IEEE/IFIPInternational Symposium on, 2001.

[10] C. Giuffrida, A. Kuijsten, and A. S. Tanenbaum. Safeand automatic live update for operating systems.ASPLOS ’13, New York, NY, USA, 2013. ACM.

[11] C. Giuffrida and A. S. Tanenbaum. Cooperativeupdate: A new model for dependable live update.HotSWUp ’09, New York, NY, USA, 2009. ACM.

[12] G. Graefe. Encapsulation of parallelism in the volcanoquery processing system. SIGMOD. ACM, 1990.

[13] P. Haller and M. Odersky. Scala actors: Unifyingthread-based and event-based programming. Theor.Comput. Sci., 2009.

[14] C. Hayden, E. K. Smith, M. Hicks, and J. Foster.State transfer for clear and efficient runtime upgrades.HotSWUp ’11.

[15] C. M. Hayden, K. Saur, M. Hicks, and J. S. Foster. Astudy of dynamic software update quiescence formultithreaded programs. HotSWUp ’12, Piscataway,NJ, USA, 2012. IEEE Press.

[16] C. M. Hayden, E. K. Smith, M. Denchev, M. Hicks,and J. S. Foster. Kitsune: Efficient, general-purposedynamic software updating for c. OOPSLA ’12, NewYork, NY, USA, 2012. ACM.

[17] M. Hicks and S. Nettles. Dynamic software updating.ACM Trans. Program. Lang. Syst., 27(6):1049–1096,Nov. 2005.

[18] IBM. Infosphere datastage data flow and job design.http://www.redbooks.ibm.com/, July 2008.

[19] L. Lamport. Time, clocks, and the ordering of eventsin a distributed system. Commun. ACM, 1978.

[20] B. T. Loo, T. Condie, J. M. Hellerstein, P. Maniatis,T. Roscoe, and I. Stoica. Implementing declarativeoverlays. SOSP ’05, New York, NY, USA, 2005. ACM.

[21] J. P. Morrison. Flow-Based Programming. NostrandReinhold, 1994.

[22] D. Mosberger and T. Jin. Httperf&mdash;a tool formeasuring web server performance. SIGMETRICSPerform. Eval. Rev., 26(3), Dec. 1998.

[23] I. Neamtiu and M. Hicks. Safe and timely updates tomulti-threaded programs. PLDI ’09, New York, NY,USA, 2009. ACM.

[24] L. Pina and M. Hicks. Rubah: Efficient,general-purpose dynamic software updating for java.HotSWUp ’13.

[25] M. Reitblatt, N. Foster, J. Rexford, C. Schlesinger,and D. Walker. Abstractions for network update.SIGCOMM ’12, New York, NY, USA, 2012. ACM.

[26] M. E. Segal and O. Frieder. On-the-fly programmodification: Systems for dynamic updating. IEEESoftw., 1993.

[27] W. Thies, M. Karczmarek, and S. P. Amarasinghe.Streamit: A language for streaming applications. CC’02, London, UK, UK, 2002. Springer-Verlag.

[28] W. Thies, M. Karczmarek, J. Sermulins, R. Rabbah,and S. Amarasinghe. Teleport messaging fordistributed stream programs. PPoPP ’05, New York,NY, USA, 2005. ACM.

[29] Y. Vandewoude, P. Ebraert, Y. Berbers, andT. D’Hondt. Tranquility: A low disruptive alternativeto quiescence for ensuring safe dynamic updates.IEEE Trans. Softw. Eng., 33(12), Dec. 2007.

[30] M. Welsh, D. Culler, and E. Brewer. Seda: Anarchitecture for well-conditioned, scalable internetservices. SOSP ’01, New York, NY, USA, 2001. ACM.