Rx: your prescription to cure asynchronous programming blues
Slides license: Creative Commons Attribution Non-Commercial Share AlikeSee http://creativecommons.org/licenses/by-nc-sa/3.0/legalcode
Mission statement
Rx is a library for composing asynchronous and event-based
programs using observable collections.
( đ âŠđ)(đ„ )= đ (đ(đ„ ))
Queries? LINQ?
Too hard todayâŠ
Download at MSDN DevLabs⹠.NET 2.0, .NET 4.0, Silverlight⹠JavaScript, more�
interface IEnumerable<out T>{ IEnumerator<T> GetEnumerator();}
interface IEnumerator<out T> : IDisposable{ bool MoveNext(); T Current { get; } void Reset();}
Essential InterfacesEnumerables â a pull-based world
You could get stuck
C# 4.0 covariance
Essential InterfacesEnumerables â a pull-based world
You could get stuck
C# 4.0 covariance
(Waiting to move next)
moving on
Mathematical dualityBecause the Dutch are (said to be) cheap
Electricity: inductor and capacitor
Logic: De Morganâs Law
Programming?
ÂŹ ( đŽâ§đ” )âĄÂŹđŽâšÂŹđ” ÂŹ ( đŽâšđ” )âĄÂŹđŽâ§ÂŹđ”
Source:
http://s
martcanucks.c
s
Whatâs the dual of IEnumerable?The recipe to dualizationhttp://en.wikipedia.org/wiki/
Dual_(category_theory)
Reversing arrowsâŠInput becomes output and
vice versaMaking a U-turn
in synchrony
interface IEnumerable<T>{ IEnumerator<T> GetEnumerator();}
interface IEnumerator<T> : IDisposable{ bool MoveNext(); T Current { get; } void Reset();}
Whatâs the dual of IEnumerable?Step 1 â Simpler and more explicit
GetEnumerator(void);
MoveNext(void); throws Exception;GetCurrent(void); C# didnât borrow
Java checked exceptions
interface IEnumerable<T>{ }
interface IEnumerator<T> :{ bool MoveNext(void) throws Exception; T GetCurrent(void);}
Whatâs the dual of IEnumerable?Step 1 â Simpler and more explicit
IDisposable
( & )IEnumerator<T> GetEnumerator(void);We really got an
enumerator and a disposable
interface{ (IDisposable & );}
interface IEnumerator<T>{ bool MoveNext(void) throws Exception; T GetCurrent(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
voidIEnumerator<T>
IEnumerableDual<T>IEnumerable<T>
) GetEnumerator(Set
Will only dualize the synchrony
aspect
interface IEnumerableDual<T>{ IDisposable SetEnumerator(IEnumerator<T> x);}
interface IEnumerator<T>{ bool MoveNext(void) throws Exception; T GetCurrent(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
This is an output too
interface IEnumerableDual<T>{ IDisposable SetEnumerator(IEnumerator<T> x);}
interface IEnumerator<T>{ (bool | Exception) MoveNext(void); T GetCurrent(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
Discrete domain with true and
false
interface IEnumerableDual<T>{ IDisposable SetEnumerator(IEnumerator<T> x);}
interface IEnumerator<T>{ (true | false | Exception) MoveNext(void); T GetCurrent(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
If you got true, you really got a T
T
interface IEnumerableDual<T>{ IDisposable SetEnumerator(IEnumerator<T> x);}
interface IEnumerator<T>{ (T | false | Exception) MoveNext(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
If you got false, you really got
void
void
interface IEnumerableDual<T>{ IDisposable SetEnumerator(}
interface{ );}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
(T | void | Exception)
MoveNext(void
But C# doesnât have discriminated unionsâŠ
Letâs splat this into three methods instead!
Got
IEnumeratorDual<T>IEnumerator<T>
IEnumeratorDual>);IEnumerator<T> x);
interface IEnumerableDual<T>{ IDisposable SetEnumerator(IEnumeratorDual<T>);}
interface IEnumeratorDual<T>{ void GotT(T value); void GotException(Exception ex); void GotNothing(void);}
Whatâs the dual of IEnumerable?Step 2 â Swap input and output
interface IObservable<T>{ IDisposable SetObserver(IObserver<T> observer);}
interface IObserver<T>{ void GotT(T value); void GotException(Exception ex); void GotNothing();}
Whatâs the dual of IEnumerable?Step 3 â Consult the Gang of FourâŠ
Sou
rce:
http
://am
azo
n.co
m
interface IObservable<out T>{ IDisposable SetObserver(IObserver<T> observer);}
interface IObserver<in T>{ void GotT(T value); void GotException(Exception ex); void GotNothing();}
Whatâs the dual of IEnumerable?Step 4 â Variance annotations
Used to detach the observerâŠ
Do you really know C# 4.0?
interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}
interface IObserver<in T>{ void OnNext(T value); void OnError(Exception ex); void OnCompleted();}
Whatâs the dual of IEnumerable?Step 5 â Color the bikeshed (*)
(*) Visit http://en.wikipedia.org/wiki/Color_of_the_bikeshed
interface IObservable<out T>{ IDisposable Subscribe(IObserver<T> observer);}
interface IObserver<in T>{ void OnNext(T value); void OnError(Exception ex); void OnCompleted();}
Essential InterfacesObservables â a push-based world
You could get flooded
C# 4.0 contravariance
Essential InterfacesSummary â push versus pull
Environment
MoveN
ex
tGot next?
Application
On
Next
Have next!
IEnumerable<T>
IEnumerator<T>
IObservable<T>
IObserver<T>
Inte
racti
ve R
eactiv
e
Essential Interfaces
demo
Getting Your ObservablesPrimitive constructors
Observable.Empty<int>()
Observable.Return(42)
Observable.Throw<int>(ex)
OnCompleted
OnNext
OnError
Observable.Never<int>()
new int[0]
new[] { 42 }
Throwing iterator
Iterator that got stuckNotion of
time
Getting Your ObservablesObserver (and enumerator) grammar
OnNext* [ OnError | OnCompleted ]
Observable.Range(0, 3)
OnNext(0)
OnNext(1)
OnNext(2)
Enumerable.Range(0, 3)
yield 0 yield 1 yield 2
o = Observable.Generate( 0, i => i < 10, i => i + 1, i => i * i);
o.Subscribe(x => { Console.WriteLine(x);});
Getting Your ObservablesGenerator functions
e = new IEnumerable<int> { for (int i = 0; i < 10; i++) yield return i * i;};
foreach (var x in e) { Console.WriteLine(x);}
Hypothetical anonymous iterator
syntax in C#
Synchronous
Asynchronous
A variant with time notion exists
(GenerateWithTime)
IObservable<int> o = Observable.Create<int>(observer => { // Assume we introduce concurrency (see later)⊠observer.OnNext(42); observer.OnCompleted();});
IDisposable subscription = o.Subscribe( onNext: x => { Console.WriteLine("Next: " + x); }, onError: ex => { Console.WriteLine("Oops: " + ex); }, onCompleted: () => { Console.WriteLine("Done"); });
Getting Your ObservablesCreate â our most generic creation operator
C# doesnât have anonymous interface implementation, so we provide various extension methods
that take lambdas.
C# 4.0 named parameter syntax
IObservable<int> o = Observable.Create<int>(observer => { // Assume we introduce concurrency (see later)⊠observer.OnNext(42); observer.OnCompleted();});
IDisposable subscription = o.Subscribe( onNext: x => { Console.WriteLine("Next: " + x); }, onError: ex => { Console.WriteLine("Oops: " + ex); }, onCompleted: () => { Console.WriteLine("Done"); });
Thread.Sleep(30000); // Main thread is blockedâŠ
Getting Your ObservablesCreate â our most generic creation operator
F10
IObservable<int> o = Observable.Create<int>(observer => { // Assume we introduce concurrency (see later)⊠observer.OnNext(42); observer.OnCompleted();});
IDisposable subscription = o.Subscribe( onNext: x => { Console.WriteLine("Next: " + x); }, onError: ex => { Console.WriteLine("Oops: " + ex); }, onCompleted: () => { Console.WriteLine("Done"); });
Thread.Sleep(30000); // Main thread is blockedâŠ
Getting Your ObservablesCreate â our most generic creation operator
F10
IObservable<int> o = Observable.Create<int>(observer => { // Assume we introduce concurrency (see later)⊠observer.OnNext(42); observer.OnCompleted();});
IDisposable subscription = o.Subscribe( onNext: x => { Console.WriteLine("Next: " + x); }, onError: ex => { Console.WriteLine("Oops: " + ex); }, onCompleted: () => { Console.WriteLine("Done"); });
Thread.Sleep(30000); // Main thread is blockedâŠ
Getting Your ObservablesCreate â our most generic creation operator
F5
IObservable<int> o = Observable.Create<int>(observer => { // Assume we introduce concurrency (see later)⊠observer.OnNext(42); observer.OnCompleted();});
IDisposable subscription = o.Subscribe( onNext: x => { Console.WriteLine("Next: " + x); }, onError: ex => { Console.WriteLine("Oops: " + ex); }, onCompleted: () => { Console.WriteLine("Done"); });
Thread.Sleep(30000); // Main thread is blockedâŠ
Getting Your ObservablesCreate â our most generic creation operator
Breakpoint got hit
Getting Your Observables
demo
form1.MouseMove += (sender, args) => {
if (args.Location.X == args.Location.Y) // Iâd like to raise another event};
form1.MouseMove -= /* what goes here? */
Bridging Rx with the WorldWhy .NET events arenât first-classâŠ
Hidden data sourceHow to pass
around?
Lack of composition
Resource maintenance?
IObservable<Point> mouseMoves = Observable.FromEvent(frm, "MouseMove");var filtered = mouseMoves .Where(pos => pos.X == pos.Y);
var subscription = filtered.Subscribe(âŠ);subscription.Dispose();
Bridging Rx with the WorldâŠbut observables sequences are!
Source of Point values
Objects can be passed
Can define operators
Resource maintenance!
FileStream fs = File.OpenRead("data.txt");byte[] bs = new byte[1024];fs.BeginRead(bs, 0, bs.Length, new AsyncCallback(iar => { int bytesRead = fs.EndRead(iar); // Do something with bs[0..bytesRead-1] }), null);
Bridging Rx with the WorldAsynchronous methods are a painâŠ
Hidden data source
Really a method pair
Lack of composition
Exceptions?
Synchronous
completion?
Cancel?
State?
Bridging Rx with the WorldâŠbut observables are cuter!
FileStream fs = File.OpenRead("data.txt");Func<byte[], int, int, IObservable<int>> read =
Observable.FromAsyncPattern<byte[], int, int, int>( fs.BeginRead, fs.EndRead);
byte[] bs = new byte[1024];read(bs, 0, bs.Length).Subscribe(bytesRead => { // Do something with bs[0..bytesRead-1]});
Tip: a nicer wrapper can easily be made using various
operators
Bridging Rx with the WorldThe grand message
We donât replace existing asynchrony:
.NET events have their usethe async method pattern is fine tooother sources like SSIS, PowerShell, WMI, etc.
but weâŠunify those worldsintroduce compositionalityprovide generic operators
hence weâŠbuild bridges!
Bridging Rx with the WorldTerminology: hot versus cold observables
Cold observables
Hot observables
var xs = Observable.Return(42);
xs.Subscribe(Console.WriteLine); // Prints 42xs.Subscribe(Console.WriteLine); // Prints 42 again
Triggered by subscription
var mme = Observable.FromEvent<MouseEventArgs> (from, "MouseMove");
mme.Subscribe(Console.WriteLine);
Mouse events going before subscription
Bridging Rx with the World
demo
Composition and QueryingConcurrency and synchronization
What does asynchronous mean?Greek:
a-syn = not with (independent from each other)chronos = time
Two or more parties work at their own pace
Need to introduce concurrency
Notion of ISchedulervar xs = Observable.Return(42, Scheduler.ThreadPool);xs.Subscribe(Console.WriteLine);
Will run on the sourceâs
scheduler
Parameterization of
operators
Composition and QueryingConcurrency and synchronization
Does duality apply?Convert between both worlds
âTime-centricâ reactive operators:
// Introduces concurrency to enumerate and signalâŠvar xs = Enumerable.Range(0, 10).ToObservable();
// Removes concurrency by observing and yieldingâŠvar ys = Observable.Range(0, 10).ToEnumerable();
source1source2source1.Amb(source2)
Race!
Composition and QueryingConcurrency and synchronization
âą ISchedulerâą WPF dispatcherâą WinForms controlâą SynchronizationCont
ext
How to synchronize?
Compositionality to the rescue!
var xs = Observable.Return(42, Scheduler.ThreadPool);xs.Subscribe(x => lbl.Text = "Answer = " + x);
xs.ObserveOn(frm).Subscribe(x => lbl.Text = "Answer = " + x);
Composition and QueryingStandard Query Operators
Observables are sources of dataData is sent to you (push based)Extra (optional) notion of time
Hence we can query over them// Producing an IObservable<Point> using Selectvar mme = from mm in Observable.FromEvent<MouseEventArgs>( form, âMouseMoveâ) select mm.EventArgs.Location;
// Filtering for the first bisector using Wherevar res = from mm in mme where mm.X == mm.Y select mm;
Composition and QueryingPutting the pieces together
ReactTextChanged Dictionary
web service
Asynchronous request
ReactionReactiveReactor
IObservable<string>
IObservable<DictionaryWord[]>
Data binding
on UI thread
$$$
// IObservable<string> from TextChanged eventsvar changed = Observable.FromEvent<EventArgs>(txt, "TextChanged");var input = (from text in changed select ((TextBox)text.Sender).Text); .DistinctUntilChanged() .Throttle(TimeSpan.FromSeconds(1));
// Bridge with the dictionary web servicevar svc = new DictServiceSoapClient();var lookup = Observable.FromAsyncPattern<string, DictionaryWord[]> (svc.BeginLookup, svc.EndLookup);
// Compose both sources using SelectManyvar res = from term in input from words in lookup(term) select words;
Composition and QueryingSelectMany â composition at its best
input.SelectMany(term => lookup(term))
Composition and Querying
demo
input
ReactReacti
ve
Service call 1
Service call 2
UI data binding
Reactive
ReactionReactiveReactor
|R|Re|Rea|Reac|React|React|Reacti|Reactiv|Reactive|
ReactiveReactionReactiveReactor
Composition and QueryingAsynchronous programming is hardâŠ
Source: http://scrapetv.com
input
ReactReacti
ve
Service call 1
Service call 2
UI data binding
Reactive
ReactionReactiveReactor
|R|Re|Rea|Reac|React|React|Reacti|Reactiv|Reactive|
Reactive
Composition and QueryingFixing out of order arrival issues
Cancel call 1
TakeU
ntil
// IObservable<string> from TextChanged eventsvar changed = Observable.FromEvent<EventArgs>(txt, "TextChanged");var input = (from text in changed select ((TextBox)text.Sender).Text); .DistinctUntilChanged() .Throttle(TimeSpan.FromSeconds(1));
// Bridge with the dictionary web servicevar svc = new DictServiceSoapClient();var lookup = Observable.FromAsyncPattern<string, DictionaryWord[]> (svc.BeginLookup, svc.EndLookup);
// Compose both sources using SelectManyvar res = from term in input from words in lookup(term).TakeUntil(input) select words;
Composition and QueryingApplying the TakeUntil fix
// Alternative approach for composition using:// IObservable<T> Switch<T>(IObservable<IObservable<T>> sources)var res = (from term in input select lookup(term)) .Switch();
Very local fix
Hops from source to source
Fixing asynchronous issues
demo
Rx for JavaScript (RxJS)
Parity with Rx for .NETSet of operatorsTaming asynchronous JS
JavaScript-specific bindings
jQueryExtJSDojoPrototypeYUI3MooToolsBing APIs
Rx for JavaScript (RxJS)
$("#input").ToObservable("keyUp") .Throttle(250) .Select(function(ev) { return query($(ev.target).text()); }) .Switch() .Select(function(result) { return formatAsHtml(result); }) .Subscribe( function(results) { $("#results").html(results); }, function(error) { $("#results").html("Error: " + error); });
Rx for JavaScript (RxJS)
demo
IEnumerable<T>
E.g. LINQ to Objects
IObservable<T>
E.g. LINQ to Events
IQueryable<T>
E.g. LINQ to SQL
Fix
ed
(MS
IL)
Tran
sla
tab
le(E
xp
ressio
n
trees)
ToQueryable
ToObservable
ToEnumerable
AsQ
uery
able
AsE
nu
mera
ble A
sQb
serv
able
AsO
bse
rvable
Pull(interactive)
Push(reactive)
Concu
rren
cy
(ISch
edul
er)
LINQ to *.*
What?
Wher
e?
How
?
Worker pools Message loopsThreads Distributed
Duality
Hom
o-
icon
ic
IQbservable<T>E.g. LINQ to PowerShell
ToQbservable
Mission accomplished
Rx is a library for composing asynchronous and event-based
programs using observable collections.
( đ âŠđ)(đ„ )= đ (đ(đ„ ))
Queries! LINQ!
Way simpler with Rx
Download at MSDN DevLabs⹠.NET 2.0, .NET 4.0, Silverlight⹠JavaScript, more�
Related ContentHands-on Labs
Two flavors:Curing the asynchronous blues with the Reactive Extensions (Rx) for .NETCuring the asynchronous blues with the Reactive Extensions (Rx) for JavaScript
Both can be found via the Rx forums
Rx team web presenceRx team blog â http://blogs.msdn.com/rxteamDevLabs â http://msdn.microsoft.com/en-us/devlabs/ee794896.aspxMSDN forums â http://social.msdn.microsoft.com/Forums/en-US/rxChannel9 â http://channel9.msdn.com/Tags/Rx