Download - Rx workshop
![Page 1: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/1.jpg)
Reactive ExtensionsWorkshop
Ryan Riley & Wes Dyer
![Page 2: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/2.jpg)
Logistics
• 30 minute sections– 10 minutes presentation– 10 minutes coding– 10 minutes discussion
• New partner every section• Prizes for best code during each section
![Page 3: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/3.jpg)
Where can I get them?
• Install with NuGet• Download at MSDN Data Developer Center
![Page 4: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/4.jpg)
Outline
1. Introduction to Rx2. A Unified Programming Model3. The Power of Rx4. RxJS5. Schedulers6. Event Processing7. Reactive Coincidence8. Continuations Everywhere9. Programming the Cloud
![Page 5: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/5.jpg)
INTRODUCTION TO RXlike events but much better
![Page 6: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/6.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 7: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/7.jpg)
Reactive Programming
In computing, reactive programming is a programming paradigm oriented around data flows and the propagation of change.
http://en.wikipedia.org/wiki/Reactive_programming
![Page 8: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/8.jpg)
Why should I care?
Socialmedia
Stock tickers
RSS feeds
GPS
Server managementUI e
vents
![Page 9: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/9.jpg)
Reactive Programming using Events
• Declareevent Action<int> E;
• PublishE(42);
• SubscribeE += x => Console.WriteLine(x);
![Page 10: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/10.jpg)
Reactive Programming using Rx
• DeclareISubject<int> S = new Subject<int>();
• PublishS.OnNext(42);
• SubscribeS.Subscribe(x => Console.WriteLine(x));
![Page 11: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/11.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 12: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/12.jpg)
class Program {
event Action<int> E;
static void Main() { var p = new Program();
p.E += x => Console.WriteLine(x);
p.E(1); p.E(2); p.E(3); }}
A Little Exampleclass Program {
ISubject<int> S = new Subject<int>();
static void Main() { var p = new Program();
p.S.Subscribe(x => Console.WriteLine(x));
p.S.OnNext(1); p.S.OnNext(2); p.S.OnNext(3); }}
![Page 13: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/13.jpg)
Separate Publish from Subscribe
Publish SubscribeBoth
![Page 14: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/14.jpg)
First-Class “Events”An object is first-class when it:• can be stored in variables and data structures• can be passed as a parameter to a subroutine• can be returned as the result of a subroutine• can be constructed at runtime• has intrinsic identity (independent of any given name)
http://en.wikipedia.org/wiki/First-class_object
![Page 15: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/15.jpg)
First-Class “Events”// storedIObservable<string> textChanged = …;
// passedvoid ProcessRequests(IObservable<string> input) {…}
// returnedIObservable<int> QueryServer() {…}
![Page 16: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/16.jpg)
Punctuation
class IObserver<in T>{ void OnNext(T value); void OnError(Exception error); void OnCompleted();}
![Page 17: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/17.jpg)
Contract
• Grammar: OnNext* [OnCompleted | OnError]• Serialized execution of observer’s methods
0 1 2
0 1
0 1 2
0 1 2
![Page 18: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/18.jpg)
Challenge: Simple Transformation
• Implement Events.LengthChanged• Implement Observables.LengthChanged• Output should be:
![Page 19: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/19.jpg)
Answer
Events Observables
![Page 20: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/20.jpg)
BRIDGING FROM THE EXISTING WORLD
query asynchronous data streams
![Page 21: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/21.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 22: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/22.jpg)
Empty
// complete immediatelyObservable.Empty<int>();
![Page 23: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/23.jpg)
Return
// return 1 valueObservable.Return(1);
1
![Page 24: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/24.jpg)
Throw
// throw an exceptionObservable.Throw(new Exception());
![Page 25: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/25.jpg)
Never
// never completeObservable.Never<int>();
![Page 26: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/26.jpg)
Range
// return three values starting with 0Observable.Range(0, 3);
0 1 2
![Page 27: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/27.jpg)
ToEnumerable / ToObservable
// enumerable to observableEnumerable.Range(0, 3).ToObservable();
0 1 2
// observable to enumerableObservable.Range(0, 3).ToEnumerable();
![Page 28: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/28.jpg)
Generate
// observable for loopObservable.Generate( 0, i => i < 3, i => i + 1, i => i * i);
0 1 4
![Page 29: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/29.jpg)
Create// anything you pleaseObservable.Create<int>(observer =>{ IDisposable b = new BooleanDisposable(); new Thread(() => { for (int i = 0; i < 3 && !b.IsDisposed; ++i) observer.OnNext(i); observer.OnCompleted(); return () => {}; }).Start(); return b;}); 0 1 4
![Page 30: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/30.jpg)
class Program { static void Main() { Label lbl = new Label(); Form frm = new Form { Controls = { lbl } }; var mouseMoves = Observable .FromEventPattern<MouseEventHandler, MouseEventArgs>( x => frm.MouseMove += x, x => frm.MouseMove -= x); mouseMoves.Subscribe(evt => {}); }}
class Program { static void Main() { Label lbl = new Label(); Form frm = new Form { Controls = { lbl } }; frm.MouseMove += (s, args) => {}; }}
Using Events
![Page 31: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/31.jpg)
class Program { static void Main() { Label lbl = new Label(); Form frm = new Form { Controls = { lbl } }; var mouseUps = …; var mouseMoves = …; var specificMoves = from up in mouseUps from move in mouseMoves let location = move.EventArgs.Location where location.X == location.Y select new { location.X, location.Y }; using (specificMoves .Subscribe(evt => lbl.Text = evt.ToString())) { Application.Run(frm); } }}
class Program { static void Main() { Label lbl = new Label(); Form frm = new Form { Controls = { lbl } }; frm.MouseMove += (sender, args) => { if (args.Location.X == args.Location.Y) { lbl.Text = args.Location.ToString(); } }; Application.Run(frm); }}
LINQ to Events
![Page 32: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/32.jpg)
Challenge
Complete DictionarySuggest• Create an Observable from the TextChanged
event• Create an Observable from an asynchronous
web request• Combine these two observables to react to
text input changes to return web service results
![Page 33: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/33.jpg)
Answer
![Page 34: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/34.jpg)
THE POWER OF RXtaking control
![Page 35: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/35.jpg)
Monitoring// anything you pleasevar input = Observable .FromEventPattern(txt, "TextChanged") .Select(evt => ((TextBox)evt.Sender).Text) .Timestamp() .Do((Timestamped<string> evt) => Console.WriteLine(evt)) .Select(evt => evt.Value) .Where(evt => evt.Length > 4) .Do(evt => Console.WriteLine(evt));
![Page 36: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/36.jpg)
Too Many Events
![Page 37: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/37.jpg)
Duplication
![Page 38: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/38.jpg)
Race Condition
![Page 39: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/39.jpg)
Challenge
Find and apply the operators to fix these issues in DictionarySuggest
![Page 40: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/40.jpg)
Answer
![Page 41: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/41.jpg)
JavaScript
![Page 42: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/42.jpg)
Challenge
Port DictionarySuggest to RxJS
![Page 43: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/43.jpg)
Answer
![Page 44: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/44.jpg)
SCHEDULERSparameterizing concurrency
![Page 45: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/45.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 46: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/46.jpg)
Parameterizing Concurrency
Observable.Timer(TimeSpan.FromSeconds(5))
Which timer?• System.Threading.Timer?• System.Timers.Timer?• System.Windows.Forms.Timer?• System.Windows.Threading.Timer?• Sleep on the current thread?• … ?
![Page 47: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/47.jpg)
Scheduler Abstraction
clock
execution context
execution policy
![Page 48: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/48.jpg)
Scheduler Interfaceinterface IScheduler{ DateTimeOffset Now { get; } IDisposable Schedule(Action work); IDisposable Schedule(TimeSpan dueTime, Action work); IDisposable Schedule(DateTimeOffset dueTime, Action work);}
![Page 49: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/49.jpg)
Operational Layering
Operatorsvar xs = Observable.Range(1, 10,
Scheduler.ThreadPool);
var q = from x in xs where x % 2 == 0 select -x;
q.Subscribe(Console.WriteLine);
![Page 50: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/50.jpg)
Operational Layering
Operators
Observables
var xs = new RangeObservable<int>(1, 10, Scheduler.ThreadPool);
var q = new SelectObservable<int>( new WhereObservable<int>( xs, x => x % 2 == 0), x => -x);
q.Subscribe( new LambdaObserver<int>(Console.WriteLine));
![Page 51: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/51.jpg)
Operational Layering
Operators
Observables
Schedulers
var n = 0;Scheduler.ThreadPool.Schedule(self => { if (n < 10) { if ((n + 1) % 2 == 0) Console.WriteLine(-(n + 1)); n++; self(); }});
![Page 52: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/52.jpg)
Operational Layering
Operators
Observables
Schedulers
Native Concurrency
var n = 0;Action<object> work = null;work = _ =>{ if (n < 10) { if ((n + 1) % 2 == 0) Console.WriteLine(-(n + 1)); n++; ThreadPool.QueueUserWorkItem(null, work); }};ThreadPool.QueueUserWorkItem(null, work);
![Page 53: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/53.jpg)
One Interface to Rule Them All
Name Execution Context Execution Policy Clock
ThreadPool Thread Pool ASAP Machine Time
Dispatcher UI Thread Priority FIFO Machine Time
EventLoop Dedicated Thread FIFO Machine Time
Immediate Current Thread Immediate Machine Time
Remote Another Process FIFO Remote Machine Time
![Page 54: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/54.jpg)
The Problem of Time
• Operations can take a long timeObservable.Timer(TimeSpan.FromYears(1000))
![Page 55: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/55.jpg)
• Some operators have time-based semantics
Observable.Timer(TimeSpan.FromSeconds(1)) .Buffer(TimeSpan.FromSeconds(1))
The Problem of Time
0 1 2 3 4 5 6
1s 1s 1s 1s 1s 1s 1s
0 1 2 3 4 5 6
1s 1s 1s 1s 1s 1s 1s
![Page 56: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/56.jpg)
• When testing reactive programs, time is the problem.…virtual time is the solution.
The Problem of Time
![Page 57: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/57.jpg)
Schedulers Revisited
Name Execution Context Execution Policy Clock
ThreadPool Thread Pool ASAP Machine Time
Dispatcher UI Thread Priority FIFO Machine Time
EventLoop Dedicated Thread FIFO Machine Time
Immediate Current Thread Immediate Machine Time
Remote Another Process FIFO Remote Machine Time
Test Current Thread FIFO Virtual Time
![Page 58: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/58.jpg)
TestScheduler scheduler = new TestScheduler();
IObservable<string> input = scheduler.CreateObservable(OnNext(300, “wes”),OnNext(400, “ryan”),OnCompleted(500));
var results = scheduler.Run(() =>input.Select(x => x.Length));
results.AssertEqual(OnNext(300, 3),OnNext(400, 4),OnCompleted(500));
Unit Testing with Schedulers
![Page 59: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/59.jpg)
• Implement MainForm.GetQuotes• Implement MainForm.GetQuery
Challenge: Historical Data
![Page 60: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/60.jpg)
Answer
![Page 61: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/61.jpg)
EVENT PROCESSINGthe power of LINQ
![Page 62: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/62.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 63: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/63.jpg)
Given: Stream of stock ticksFind: 10% daily price increase
Event Processing
![Page 64: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/64.jpg)
Windowing
MSFT27.01
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
Group by symbol: GroupBy(t => t.Symbol)
![Page 65: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/65.jpg)
Aggregation
MSFT INTC
Aggregate each day with previous day: Buffer(2, 1)
21.75 22.54 20.98
27.01 27.96 31.21 30.73
![Page 66: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/66.jpg)
Filtering
MSFT INTC
Filter by price increase > 10%: Where(w => PriceIncrease(w) > .1)
21.7522.54
22.5420.98
27.0127.96
27.9631.21
31.2130.73
![Page 67: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/67.jpg)
Reduce
MSFT INTC
Reduce to a single stream: Merge()
27.9631.21
![Page 68: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/68.jpg)
Done!
27.9631.21
![Page 69: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/69.jpg)
from tick in stockTicksgroup tick by tick.Symbol into symbolStreamfrom window in symbolStream.Buffer(2, 1)let increase = PriceIncrease(window)where increase > .1select new { symbol = symbolStream.Key, increase };
LINQ: Event Processingsource group
aggregateapplyfilter
reduce
![Page 70: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/70.jpg)
• Change MainForm.Query to compute the Average High and Average Low over the past 5 trading days as well as the current Close and Date
Challenge: Event Processing
![Page 71: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/71.jpg)
Answer
![Page 72: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/72.jpg)
REACTIVE COINCIDENCEstreaming windows
![Page 73: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/73.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 74: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/74.jpg)
Event Duration
0 1 2
0 1 2
vs
![Page 75: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/75.jpg)
An Example
left right left
![Page 76: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/76.jpg)
Representing Duration
begin begin
end
end
Window
![Page 77: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/77.jpg)
Store
![Page 78: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/78.jpg)
Reactive Coincidence
Which people are at the store while it is opened?
5/5 5/6
moe larry curley
![Page 79: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/79.jpg)
moe
Reactive Coincidence
Which people are at the store while it is opened?
5/5 5/6
moe larry curley
![Page 80: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/80.jpg)
LINQ Join
from opening in storeOpeningsjoin person in personArrives on opening.Close equals person.Leaves into gselect new { opening.Day, People = g };
![Page 81: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/81.jpg)
• Change query in MainForm.MainForm to compute a stream of deltas when the mouse is down
Challenge: Drag and Drop
![Page 82: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/82.jpg)
Answer
![Page 83: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/83.jpg)
Continuation Passing Style
![Page 84: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/84.jpg)
Ruby
1..10.each do |x| puts x * xend
![Page 85: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/85.jpg)
AJAX$.ajax({
url: 'http://en.wikipedia.org/w/api.php',dataType: 'jsonp',data: {action: 'opensearch',search: term,format: 'json'},success: function(msg) {alert('Data saved:' + msg);}
});
![Page 86: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/86.jpg)
node.js
var net = require('net');
var server = net.createServer(function (socket) {socket.write("Echo server\r\n");
socket.pipe(socket);});
server.listen(1337, "127.0.0.1");
![Page 87: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/87.jpg)
Challenge
Using Rx, build a TCP server that works in a similar manner to node.js.
![Page 88: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/88.jpg)
PROGRAMMING THE CLOUDa glimpse into the future
![Page 89: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/89.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 90: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/90.jpg)
Distributed Queries
Cloud
query results
![Page 91: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/91.jpg)
Observable.Timer( TimeSpan.FromSeconds(1), Scheduler.Azure) .ObserveLocally() .Subscribe(Console.WriteLine);
Distributed Queries
Send the query to the cloud
Send the results back
![Page 92: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/92.jpg)
Pass-by-Value
[Serializable] class MyType { … }
Pass-by-Reference
class MyType : MarshalByRefObject { … }
Distributed Parameters
![Page 93: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/93.jpg)
var x = 42;scheduler.Schedule(() => Console.WriteLine(x));
class Closure { public int x; public void M() { Console.WriteLine(x); }}
var closure = new Closure();closure.x = 42;scheduler.Schedule(closure.M);
Distributed SchedulingPass by value or
reference?
![Page 94: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/94.jpg)
interface IScheduler { … IDisposable Schedule<T>(T state, Action<T> work); …}
Scheduler Interface Revisited
![Page 95: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/95.jpg)
var x = 42;scheduler.Schedule( x, state => Console.WriteLine(state));
static void M(int state) { Console.WriteLine(state);}
var x = 42;scheduler.Schedule( x, M);
Distributed Scheduling
No closures!!!
![Page 96: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/96.jpg)
scheduler.Schedule(42, x => scheduler.Schedule(x + 1, y => Console.WriteLine(y)));
Nested Scheduling
![Page 97: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/97.jpg)
Distributed Scheduling
Scheduler
cloud
![Page 98: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/98.jpg)
interface IScheduler { … IDisposable Schedule<T>(T state, Action<IScheduler, T> work); …}
Scheduler Interface Rerevisited
![Page 99: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/99.jpg)
scheduler.Schedule(42, (s, x) => s.Schedule(x + 1, y => Console.WriteLine(y)));
Nested Scheduling
![Page 100: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/100.jpg)
Fan-out Scheduling
Scheduler
Scheduler
Scheduler
Scheduler
Scheduler
Scheduler
Scheduler
![Page 101: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/101.jpg)
var d = scheduler.Schedule(42, (s, x) => { var d1 = s.Schedule(x + 1, Console.WriteLine); var d2 = s.Schedule(x + 2, Console.WriteLine);});
Fan-out Scheduling
How do we make d depend on d1 & d2?
![Page 102: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/102.jpg)
interface IScheduler { … IDisposable Schedule<T>(T state, Func<IScheduler, T, IDisposable> work); …}
Scheduler Interface Rererevisited
![Page 103: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/103.jpg)
var d = scheduler.Schedule(42, (s, x) => { var d1 = s.Schedule(x + 1, Console.WriteLine); var d2 = s.Schedule(x + 2, Console.WriteLine); return new CompositeDisposable(d1, d2);});
Fan-out Scheduling
![Page 104: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/104.jpg)
scheduler.Schedule(42, (state, self) => { Console.WriteLine(state); self(state + 1); });
Easy Recursive Scheduling
![Page 105: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/105.jpg)
• Change Program.Main to use the AppDomainScheduler
• Reimplement GenerateObservable.Subscribe
Challenge: AppDomains
![Page 106: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/106.jpg)
Answer
![Page 107: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/107.jpg)
The Reactive Extensions
Rx is …1. a set of types representing asynchronous data streams2. a set of operators to query asynchronous data streams3. a set of types to parameterize concurrency
Rx = Observables + LINQ + Schedulers
![Page 108: Rx workshop](https://reader035.vdocuments.us/reader035/viewer/2022081422/554ebdc9b4c9053c4b8b4791/html5/thumbnails/108.jpg)
Learn MoreResources• Rx Developer Center• Rx on Channel 9
Projects• ReactiveUI• Fluent State Observer• Reactive ETL• ReactiveOAuth• Reactive Remoting
Extensions to the Extensions• Reactive Extensions – Extensions• Rx Contrib• RxUtilities• Rx Power Toys