cloud-scale event processing using rx bart j.f. de smet [email protected] principal software...

48
Cloud-Scale Event Processing Using Rx Bart J.F. De Smet [email protected] Principal Software Engineer Microsoft Corporation

Upload: ryleigh-duran

Post on 16-Dec-2015

215 views

Category:

Documents


0 download

TRANSCRIPT

Cloud-Scale Event ProcessingUsing Rx

Bart J.F. De [email protected]

Principal Software EngineerMicrosoft Corporation

Who, Where, When?

• Born in Belgium• Computer Science Engineering at University of Ghent

• In the “civil” engineering department• Most Valuable Professional (MVP) for C#

• Joined Microsoft in 2007• .NET Framework / WPF team member (2007-2010)

• Working on Application Model• SQL Division (2010-2012)

• Working on Reactive Extensions with Erik Meijer et al• Online Services Division (2012-…)

• Cloud-scale event processing using Rx, powering Cortana etc.

Belgian beerswww.neo4j.com (really)

Exiled to Building 10

Who, Where, When?

• Author• C# Unleashed series at SAMS• Pluralsight courses (pluralsight.com)

• Speaker• TechEd, TechDays, //build/, PDC, etc.

• Channel 9 videos (channel9.msdn.com)• Goto, JAOO, etc.

• Physics lover• Beauty of dualities, symmetries, etc.• Power of diagrams, formalism, etc.

Reactive ExtensionsWhat, why, how?

An Accidental Discovery

• Cloud Programmability Team• “Oasis” within Live Labs and later the SQL Server organization

• Founded by Erik Meijer and Brian Beckman• Code-named “Tesla” after Nikola Tesla (the electric car didn’t exist yet)

• Founded in the mid 2000s• Ray Ozzie ages• Making sense of this new thing called “cloud”

• Various projects• IL2JS – a compiler from IL to JavaScript• Extension to JavaScript with classes, modules, types• Embarrassingly distributed build system for the cloud• Reactive Extensions aka “LINQ to Events”

Nikola Teslawww.knowledgeoftoday.org

An Accidental Discovery

• Project “Volta”• Tier-splitting of applications• Write as single-tier .NET application using metadata annotations

• Attributes like [RunOnClient]• Cross-compilation of code to match client capabilities

• Desktop CLR or Silverlight when available• IL to JavaScript when necessary

• Even compiling Windows Forms controls to HTML

• No promises or futures• The world of Begin/End where Task<T> had yet to be invented• async/await was unheard of (C# 5.0)• But the web is asynchronous…

www.Wikipedia.org

An Accidental Discovery

• Project “Volta”• Dealing with asynchrony across tiers

• Ultimately needs to cross-compile to AJAX• Events are not first-class objects

• Can’t transport them across tiers

[RunOnClient]public event EventHandler<MouseEventArgs> MouseMoved;

// Runs in cloudpublic void CloudCanvas(){ MouseMoved += (o, e) => { /* do stuff */ };}

An electric eel…

Making Events First-Class

• First-class concepts have object representations• Methods can be transported using delegates

• But properties, indexers, and events are metadata citizens

Action a = new Action(Foo); // explicit creation of delegate instanceAction b = Foo; // method group conversionAction c = () => { … }; // creates anonymous method

void Foo() { … }

event Action Bar // metadata that refers to …{ add { … } // add accessor remove { … } // remove accessor}

http://en.wikipedia.org/wiki/First-class_citizen

Fundamental Abstractions

• Adapting the observer pattern• Ensuring duality with the enumerator pattern• More compositional approach

interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}

interface IObserver<in T>{ void OnNext(T value); void OnError(Exception error); void OnCompleted();}

Notification grammar

OnNext* (OnError | OnCompleted)?

“Gang of four” bookAddison-Wesley

Highly Compositional

• LINQ-style query operators over IObservable<T>• Composition of 0-N input sequences

• Composition of disposable subscriptions and scheduler resourcesinterface IScheduler{ IDisposable Schedule(Action work); …}

static class Observable{ static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> f); static IObservable<R> Select<T, R>(this IObservable<T> source, Func<T, R> p); …}

Function compositionwww.Wikipedia.org

Highly Compositional

• Building a binary Merge operatorpublic static class Observable{ public static IObservable<T> Merge<T>(this IObservable<T> xs, IObservable<T> ys) { return Create<T>(observer => { var gate = new object();

return new CompositeDisposable { xs.Subscribe(x => { lock (gate) { observer.OnNext(x); } }, …), ys.Subscribe(y => { lock (gate) { observer.OnNext(y); } }, …), }; }); }}

First-class: can build extension methods

Composition of resource management

Function compositionwww.Wikipedia.org

The Role of Schedulers

• Pure architectural layering of the system• Logical query operators (~ relational engine)• Physical schedulers (~ operating system)

• Abstract over sources of asynchrony and time• Threads, thread pools, tasks, message loops• DateTime.Now, timers

• Enable virtual time testing

public static IObservable<T> Return<T>(T value, IScheduler scheduler){ return Create<T>(obs => scheduler.Schedule(() => { obs.OnNext(value); obs.OnCompleted(); }));}

Space-timewww.Wikipedia.org

The Beauty of Duality

• Category theory to the rescue (Bierman, Meijer)• Observable/observer (push) is dual to enumerable/enumerator (pull)• Cross-influence of both domains

interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}

interface IObserver<in T>{ void OnNext(T value); void OnError(Exception error); void OnCompleted();}

interface IEnumerable<out T>{ IEnumerator<T> GetEnumerator();}

interface IEnumerator<out T> : IDisposable{ bool MoveNext() throws Exception; T Current { get; } void Reset();}

Category theorywww.Wikipedia.org

One versus Many

• When Task and Task<T> were born…• Single-value specializations• Await-ability of sequences (for aggregates)

Func<T> f

var x = wait f();

IEnumerable<T> xs

foreach (var x in xs) { f(x); }

Task<T> t

var x = await t;

IObservable<T> xs

xs.Subscribe(x => { f(x); });

One

Man

y

Synchronous Asynchronous

Stephen KleeneKleene star (closure)

www.Wikipedia.org

Customizable Execution Plans

• Capturing user intent using expression trees• Code-as-data• Homoiconicity = same syntax• E.g. LINQ to Twitter

IEnumerable<T> xs

from x in xs where f(x) …

IQueryable<T> xs

from x in xs where f(x) …

IObservable<T> xs

from x in xs where f(x) …

IQbservable<T> xs

from x in xs where f(x) …

Code

Dat

a

Pull Push

Alan KayHomoiconicity, 1969

www.Wikipedia.org

Taking Rx to the CloudLarge-scale distributed event processing in Bing

“Cloud First, Mobile First”

• Events are all around us• Classic frameworks for UI programming• Sensors in phones, IoT scenarios, etc.• Monitoring of systems and cloud infrastructures• Changes to the world’s information

• Building a scalable abstraction that allows for:• Hiding of concrete implementations• Distributed and intelligent execution• Compute close to data

Satya Nadella“Cloud-first, mobile-first”

www.businessinsider.com

Going Where the Data is

• New reactive event processing effort• Founded in Bing around 2012

• Standing on the shoulders of (relevant) giants• Massive amounts of data available

• Powering scenarios for “Cortana”• Track updates to flights, weather, sport scores, news etc.• Remind me when to leave based on traffic conditions

• Cloud and device• Capturing the world’s information as real-time streams• Abstracting device sensors (GPS etc.) as streams

<3

Observations on Cloud and Devices

• Optimization for resources• CPU + memory = power

• Density of computation in the cloud (# of standing queries / machine)• Affordability as background service on devices with < 1GB of RAM

• Reliability of computations• Device-side services

• Subject to tombstoning (cf. application lifecycle)• Can run out of battery

• Cloud-side services• Outages of compute nodes are the way of life• Deployment causes intentional failover of services

Richard FeynmanWestview Press

Some (Simplified) Cortana Queriesfrom w in weatherwhere w.City == “Seattle”select w

flights .Where(f => f.Code == “BA49”) .DelaySubscription(departure – 24 * H) .TakeUntil(arrival + 24 * H) .Select(f => f.Status)

userLocation .Sample(1 * M) .DistinctUntilChanged() .Select(here => traffic(here, meetingLocation) .TakeUntil(meeting – minimumTime) .Select(t => Timer(meeting – t.EstimatedTime)) .StartWith(Timer(meeting – estimatedTime)) .Switch()) .Switch() .Select(_ => “Leave now for ” + subject) .DelaySubscription(meeting – estimatedTime)

Temporal query operators

Device-sideevent stream

// Insert cloud-side observable sequence// of time-to-leave timer here

Cloud-sideevent stream

Higher-orderquery operators

Remembertier-splitting?

Bing cloud

IRP

High-Level Overview

• IReactiveProcessing (IRP) abstraction• Representation of an event processing service• Highly interoperable between devices and services• Russian doll / turtles all the way down model

Partner1 cloudIRP

Partner2 cloudIRP

Tablet

IRP

Phone

IRP

Node

IRP

Node

IRP Node

IRP

Scaling the Abstractions

• Lessons learned from Rx• Enhanced phasing of subscription lifecycle• Turning subjects into streams• Building the “Standard Model” of event processing

• Addressing distributed system concerns• Identification of artifacts• Latencies and asynchrony• Reliability of computation and messages• Deployment friendliness• Elasticity and dynamic topologies• Management of system resources

Adapting RxBuilding an execution engine

Event Processing Execution Engine

• How to leverage Rx?• Rich library of query operators, including temporal ones• Additional requirements

• Persist and recovery state (e.g. aggregates)• High density (> millions of subscriptions per machine)• Don’t miss input events, even when node is down• Scalable egress (e.g. notification platforms)

• Reliability approach• Periodic checkpointing of state to persistent storage• Acknowledge / replay of ingress messages

• At-least-once processing guarantees• De-duplication can be layered on top

Distributed Event Processing

• Coordinator distributes query fragments to engines over IRP• Protocol between engines

Event Processing Execution Engine Node1

Rx

IRP

Event Processing Execution Engine Node2

Rx

IRP

Checkpointstorage

Save

Load

Checkpointstorage

Save

Load

Reliable

messaging

OnNext

Replay

Ack

Revisiting IObservable<T>

• Composition in Rx can be hairy due to race conditions• Subscribe makes the sequence “hot”• Callbacks can happen at any time, even before IDisposable is returned

public static IObservable<T> Take<T>(this IObservable<T> xs, int count) { return Create<T>(observer => { var remaining = count;

return xs.Subscribe(x => { observer.OnNext(x); if (--remaining == 0) { observer.OnCompleted(); // how to dispose? } }, …); });}

Revisiting IObservable<T>

• Inventions a la SingleAssignmentDisposable (SAD)public static IObservable<T> Take<T>(this IObservable<T> xs, int count) { return Create<T>(observer => { var remaining = count;

var = new SingleAssignmentDisposable(); .Disposable = xs.Subscribe(x => { observer.OnNext(x); if (--remaining == 0) { observer.OnCompleted(); .Dispose(); } }, …);

return ; });}

Revisiting IObservable<T>

• Need to be able to traverse the operator tree for various purposes• Provide context to operators, e.g. loggers, resource managers, etc.• Hide schedulers from users by “flowing” them• Visit stateful nodes to persist / load state for checkpointing

• Limitations of current Rx• Subscribe combines “Attach” and “Start” lifecycle states• Dispose combines “Stop” and “Detach” lifecycle state

• Generalization using ISubscribable<T>• Enable traversal of operator trees

interface ISubscribable<out T>{ ISubscription Subscribe(IObserver<T> observer);}

interface Isubscription : IDisposable{ void Accept(ISubscriptionVisitor v);}

Revisiting IObservable<T>

• Core operator library is built using ISubscribable<T>public static ISubscribable<T> Take<T>(this ISubscribable<T> xs, int count) { return new Take<T>(xs, count);}

class Take<T> : SubscribableBase<T> { …

public ISubscription Subscribe(IObserver<T> observer) { return new Impl(this, observer); }

class Impl : Operator, IStatefulOperator { // interfaces for visitors protected override void OnStart() { … } … }}

Revisiting IScheduler

• Rx scheduler benefits• Abstract over time & allow for virtual time• Hide different sources of concurrency

• Evolving schedulers• Physical schedulers

• Own operating system resources• Try to achieve ideal degree of concurrency

• Compositional approach to logical schedulers• Logical child schedulers can be created• Unit of pausing used for checkpointing

• Pause / resume a la GC• Ensures stable operator state

Revisiting ISubject<T>

• What’s the dual of an ISubject<T>?• Notice the variance of the base interfaces…

• …but if we flip things around, data cannot “flow”…

• Problems• Using an “is-a” relationship versus a “has-a” relationship• Cannot have multiple producers (input observers) to a subject

interface ISubject<T> : IObservable<out T>, IObserver<in T>}

interface ITcejbus<T> : IEnumerable<out T>, IEnumerator<out T>}

Revisiting ISubject<T>

• Introduction of the IMultiSubject<T>• Provides a way to get many producers

• Subject implementation decides on policy to “merge”• Resolves the input/output conundrum

• Dual abstraction in the enumerable world makes sense• See Interactive Extensions (Ix)’s IBuffer<T>• Left as an exercise

interface IMultiSubject<T> : IObservable<out T>{ IObserver<T> GetObserver();}

The IRP Programming ModelSuper-symmetric designs

A Hyper-Triad of Interfaces

• IReactiveProcessing provides• Proxies – source of composition of artifacts (DML)• Definitions – enables definition of artifacts (DDL)• Metadata – catalog of artifacts for discoverability

• Super-symmetric design• Same interface exists in many “spaces”• Hyper-cube of parallel worlds

• Synchronous versus asynchronous• Extrinsic versus intrinsic identities• Code-as-data versus code• Reliable versus non-reliable• Etc.

Sync Async

Intr

insi

cEx

trin

sic

CodeData

Higher dimensionsCalabi-Yau manifold

www.Wikipedia.org

A Hyper-Triad of Interfaces

• IReactiveProcessing provides• Proxies – source of composition of artifacts (DML)• Definitions – enables definition of artifacts (DDL)• Metadata – catalog of artifacts for discoverability

• Super-symmetric design• Same interface exists in many “spaces”• Hyper-cube of parallel worlds

• Synchronous versus asynchronous• Extrinsic versus intrinsic identities• Code-as-data versus code• Reliable versus non-reliable• Etc. Async

Extr

insi

cDataE.g. this is the

client-side API

Need string theory

Higher dimensionsCalabi-Yau manifold

www.Wikipedia.org

Enabling Composition through Proxies• It all starts with a logical “context”• Similar to LINQ to SQL’s DataContext• Parameterized by a physical connection

• To a cloud service, or to a device, etc.• Deals with authentication, various knobs, etc.

• Family of Get* methods to obtain proxies to artifacts• Observables, observers, etc.

var conn = new ReactiveServiceConnection(endpoint);var ctx = new ClientContext(conn);

var traffic = ctx.GetObservable<TrafficInfo>(trafficUri);var http = ctx.GetObserver<HttpData>(httpUri);

Explicit identifiers provided for artifacts

Enabling Composition through Proxies• Get* methods do not go to the IRP system• Just obtain local proxies to remote artifacts• Artifacts have extrinsic identifiers, specified as URIs

• Async methods go to the IRP system• E.g. SubscribeAsync, OnNextAsync, etc.

• Examplevar traffic = ctx.GetObservable<TrafficInfo>(trafficUri);var http = ctx.GetObserver<HttpData>(httpUri);

var subscription = await traffic.Where(t => t.Road == “I-90”) .Select(t => new HttpData { Uri = myService, Body = t.ToString() }) .SubscribeAsync(http, mySubUri);

Explicit identifiers provided for artifacts

Enabling Composition through Proxies• What happens when SubscribeAsync is called?• User intent is obtained from expression tree

• IAsyncReactiveQbservable<T>• IAsyncReactiveQbserver<T>• Etc.

• Expression tree gets serialized over the wire• Normalization of the tree into invocation expressions• Language agnostic format (interoperable with weakly typed languages like JavaScript)• Dependencies on concrete runtimes a la CLR get stripped out (no type deployment)

• Payload is sent to target IRP system

Enabling Composition through Proxiesvar traffic = ctx.GetObservable<TrafficInfo>(trafficUri);var http = ctx.GetObserver<HttpData>(httpUri);

var subscription = await traffic.Where(t => t.Road == “I-90”) .Select(t => new HttpData { Uri = …, Body = … }) .SubscribeAsync(http, mySubUri);

Invoke

rx://subscribe Invoke

rx://select Invoke

rx://where trafficUri

httpUri

λ

t => new { bing://http/uri = …, bing://http/body = … }

λ t => t.Road == “I-90”

Unbound parametersprocessed by binder

Structural typing to reduce coupling

Enabling Abstraction through Definitions• Define* methods allow for definition of artifacts• Observables, observers, etc.• Much like stored procedures, user-defined functions, etc.• Also identified by URIs

var traffic = ctx.GetObservable<TrafficInfo>(trafficUri);

// Defineawait ctx.DefineObservableAsync<string, TrafficInfo>( trafficByRoadUri, road => traffic.Where(t => t.Road == road));

// Proxiesvar trafficByRoad = ctx.GetObservable<string, TrafficInfo>(trafficByRoadUri);trafficByRoad(“I-90”).…

Rich Mapping Capabilities

• No built-in artifacts• Query operators are parameterized observables• Extension methods provide an illusion

• This fluent pattern…

• … is the same as

var xs = ctx.GetObservable<int>(xsUri);var iv = ctx.GetObserver<int>(ivUri);

await xs.Where(x => x > 0).Select(x => x * x).SubscribeAsync(iv, …);

var xs = ctx.GetObservable<int>(xsUri);var iv = ctx.GetObserver<int>(ivUri);var where = ctx.GetObservable<IARQ<int>, Expr<Func<int, bool>>, int>(whereUri);var select = ctx.GetObservable<IARQ<int>, Expr<Func<int, int>>, int>(selectUri);

await select(where(xs, x => x > 0), x => x * x).SubscribeAsync(iv, …);

Rich Mapping Capabilities

• [KnownResource] attributes can be used everywhere• Provide shorthand syntax for Get* operations• Enable code generation using the Metadata facilities

• What’s IAsyncReactiveQbservable<T>?• Async because it’s client-side (cf. SubscribeAsync)• Reactive because we had to disambiguate with Rx concepts• Qbservable because it’s expression tree based

static class AsyncReactiveQbservable{ [KnownResource(whereUri)] static IAsyncReactiveQbservable<T> Where<T>(this IAsyncReactiveQbservable<T> xs, Expression<Func<T, bool>> filter) {…}}

Enabling Discovery through Metadata• Series of properties for different artifact types• Observables, observers, subscriptions, etc.• IQueryableDictionary<Uri, TEntity> allows for structured (LINQ) queries

• Enables tooling• “RxMetal” a la “SqlMetal” to generate a derived ClientContext• Artifact explorers a la Server Explorer in Visual Studio

• Enables delegation• IRP systems can discover capabilities across boundaries

var traffic = ctx.Observables[trafficUri];var operators = ctx.Observables.Where(kv => kv.Key.StartsWith(“rx://”));

Standard Model of Event Processing

• What are the reactive artifact types?• Division based on temperature

• Cold = potential energy; to be instantiated• Hot = kinetic energy; active artifacts

• Kinds of artifacts familiar from Rx• Cold artifacts can be parameterized

Cold Operation HotObservable SubscribeAsync Subscription

(Our Higgs boson) CreateAsync ObserverSubject factory CreateAsync Subject

Standard Modelwww.Wikipedia.org

DemoQuick example of using IRP

What’s Next?Starting the conversation on Rx v3.0

Rx v3.0

• Peeling the onion of our system• Many IRP implementations in Bing• But “IRP Core” is…

• Built in a reusable fashion• Strict separation of logical and physical

• What, when, where is TBD• Operator library with checkpointing etc.• Service abstractions• Data model and data / expression serialization stacks• Query engine components

• Give us feedback! ([email protected])

Service

Host

Data model

Query engine

Operators

Q&AThanks!