building responsive application with rx - confoo - tamir dresher
TRANSCRIPT
1
Tamir Dresher (@tamir_dresher)Senior Software ArchitectJ
Building Responsive Applications with Rx
1
2
• Author of Rx.NET in Action (manning publications)• Software architect, consultant and instructor• Software Engineering Lecturer @ Ruppin Academic Center• OzCode (www.oz-code.com) Evangelist
@[email protected]://www.TamirDresher.com.
About Me
Making our users happy
3
Making our users NOT happy
4
5
Happiness
Responsiveness
Complexity
Asynchroncity
Headache
Reactive Extensions (Rx)
Your headache relief pill to Asynchronous Event based applications
AsyncPush
TriggersEvents
6
Reacting to changes
7
IEnumerable<Message> LoadMessages(string hashtag){
var statuses = facebook.Search(hashtag);var tweets = twitter.Search(hashtag);var updates = linkedin.Search(hashtag);return statuses.Concat(tweets).Concat(updates);
}
Twitter App
Pull ModelPull Model
8
???? LoadMessages(string hashtag){
facebook.Search(hashtag);twitter.Search(hashtag);linkedin.Search(hashtag);
}
DoSomething( )msg
Push ModelPush Model
9
Push model with Events
10
var mgr = new SocialNetworksManager();mgr.MessageAvailable += (sender, msg) => {//do something};mgr.LoadMessages("Rx");
class SocialNetworksManager{ //members
public event EventHandler<Message> MessageAvailable; public void LoadMessages(string hashtag) { var statuses = _facebook.Search(hashtag); NotifyMessages(statuses); : }}
hard to unsubscribe non-composable
can’t send as argument
no isolation
observable observer
Subscribe(observer)
subscription
OnNext(X1)
OnNext(Xn)⁞
X1observable X2 Xn
⁞
IDisposable
Observables and ObserversObservables and Observers
12
OnCompleted()
OnError(Exception)
observable
observable observer
X1 X2 Xn
⁞
Observables and ObserversObservables and Observers
13
namespace System{ public interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }
public interface IObserver<in T> { void OnNext(T value); void OnError(Exception error); void OnCompleted(); }}
InterfacesInterfaces
14
Rx packages
15
Push ModelPush Model with Rx Observables
class ReactiveSocialNetworksManager{ //members public IObservable<Message> ObserveMessages(string hashtag) {
: }}
var mgr = new ReactiveSocialNetworksManager();
mgr.ObserveMessages("Rx") .Subscribe( msg => Console.WriteLine($"Observed:{msg} \t"), ex => { /*OnError*/ }, () => { /*OnCompleted*/ });
16
Creating Observables
17
IObservable<Message> ObserveMessages(string hashtag){
}
return Observable.Create( async (o) => {
});
var messages = await GetMessagesAsync(hashtag);foreach(var m in messages){ o.OnNext(m);}o.OnCompleted();
Pull To PushPull To Push
Subscribe function
FacebookClient:
18
IObservable<Message> ObserveMessages(string hashtag){
}
var messages = await GetMessagesAsync(hashtag);return messages.ToObservable();
Built-in ConvertersBuilt-in Converters
Note: All observers will get the same emitted messages
19
IObservable<Message> ObserveMessages(string hashtag){
}
return Observable.Defer( async () => {
});
var messages = await GetMessagesAsync(hashtag);return messages.ToObservable();
Built-in ConvertersDeferring the observable creation
Note: Every observer will get the “fresh” emitted messages
20
Observable.Merge(
facebook.ObserveMessages(hashtag),
twitter.ObserveMessages(hashtag),
linkedin.ObserveMessages(hashtag));
Built-in OperatorsBuilt-in Combinators (combining operators)
21
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x));
Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Console.WriteLine(x));
Observable.FromEventPattern(SearchBox, "TextChanged")
1 2 10
⁞
1 2 10
⁞1 sec 1 sec
Observables Factories
⁞
Observables Factories
22
Observable Queries (Rx Operators)
23
Filtering Projection Partitioning Joins Grouping Set
Element Generation Quantifiers Aggregation Error HandlingTime and Concurrency
Where
OfType
Select
SelectMany
Materialize
Skip
Take
TakeUntil
CombineLatest
Concat
join
GroupBy
GroupByUntil
Buffer
Distinct
DistinctUntilChanged
Timeout
TimeInterval
ElementAt
First
Single
Range
Repeat
Defer
All
Any
Contains
Sum
Average
Scan
Catch
OnErrorResumeNext
Using
Rx operators
24
Operator2 Creates an IObservable<T3>
IObservable<T2> Operator<T1>(IObservable<T1> source, parameters);
Operator 1 Subscribe( )
observer
observable… Operator
N
Operator1 Creates an IObservable<T2>
IObservable<T>Original source
observerSubscribe
ComposabilityComposability
25
g(x)
f(x)
Observable Query PipelineObservable Query Pipeline
26
Reactive SearchReactive Search
27
1. At least 3 characters2. Don’t overflow server (0.5 sec delay)3. Don’t send the same string again4. Discard results if another search was requested
Reactive Search - RulesReactive Search - Rules
28
Demo
Click icon to add picture
29
30
R RE REACREA REA REAC
31
R RE REACREA
REACREA
REA REAC
REA REAC
Where(s => s.Length > 2 )
32
R RE REACREA
REACREA
REA REAC
REA REAC
Where(s => s.Length > 2 )
Throttle(TimeSpan.FromSeconds(0.5))
REACREA REAC
33
R RE REACREA
REACREA
REA REAC
REA REAC
Where(s => s.Length > 2 )
Throttle(TimeSpan.FromSeconds(0.5))
REACREA REAC
DistinctUntilChanged()
REACREA
35
REACREA
Select(text => SearchAsync(text))
“REA”
“REAC”
Switch()
“REA” results are ignored since we switched to the “REAC” results
Rx Queries FTW!Building a Stock Ticker with zero effort
36
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
from tick in ticks
37
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
27.01 27.96 31.21 30.73
MSFT
21.75 22.54 20.98
INTC
from tick in ticksgroup tick by tick.Symbol into company
38
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
MSFT
INTC
from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)
[27.01, 27.96] [27.96, 31.21] [31.21, 30.73]
[21.75, 22.54] [22.54, 20.98]
39
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
MSFT
INTC
from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]
0.034 0.104 -0.015
0.036 -0.069
40
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
MSFT
INTC
from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]where diff > 0.1
0.034 0.104 -0.015
0.036 -0.069
41
Stock Trade Analysis
MSFT27.01
ticks
INTC21.75
MSFT27.96
MSFT31.21
INTC22.54
INTC20.98
MSFT30.73
from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]where diff > 0.1select new { Company = company.Key, Increase = diff }
res
Company = MSFTIncrease = 0.104
42
Abstracting Time and Concurrency
43
What will be the output?
The Repeat operator resubscribes the observer when the observable completes
Most developers falsely believe that the program will immediately dispose the subscription and nothing will be written to the consoleRx is not concurrent unless it explicitly told to
The example above writes the sequence 1-5 indefinitely to the console
44
var subscription = Observable.Range(1, 5) .Repeat() .Subscribe(x => Console.WriteLine(x));
subscription.Dispose();
Thread Pool
Task Scheduler
other
SchedulersSchedulers
45
public interface IScheduler{ DateTimeOffset Now { get; }
IDisposable Schedule<TState>( TState state, Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TimeSpan dueTime, TState state,Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(DateTimeOffset dueTime,
TState state,Func<IScheduler, TState, IDisposable> action);
}46
//// Runs a timer on the default scheduler//IObservable<T> Throttle<T>(TimeSpan dueTime);
//// Every operator that introduces concurrency// has an overload with an IScheduler//IObservable<T> Throttle<T>(TimeSpan dueTime, IScheduler scheduler);
Parameterizing ConcurrencyParameterizing Concurrency
47
textChanged.Throttle(TimeSpan.FromSeconds(0.5),
DefaultScheduler.Instance).DistinctUntilChanged().SelectMany(text => SearchAsync(text)).Switch().Subscribe(/*handle the results*/);
48
//// runs the observer callbacks on the specified // scheduler.//IObservable<T> ObserveOn<T>(IScheduler);
//// runs the observer subscription and unsubsciption on// the specified scheduler.//IObservable<T> SubscribeOn<T>(IScheduler)
Changing Execution ContextChanging Execution Context
49
textChanged.Throttle(TimeSpan.FromSeconds(0.5)).DistinctUntilChanged().Select(text => SearchAsync(text)).Switch()
.ObserveOn(DispatcherScheduler.Current).Subscribe(/*handle the results*/);
Changing Execution ContextChanging Execution Context
50
textChanged.Throttle(TimeSpan.FromSeconds(0.5)).DistinctUntilChanged().Select(text => SearchAsync(text)).Switch().ObserveOnDispatcher().Subscribe(/*handle the results*/);
Changing Execution ContextChanging Execution Context
51
Your headache relief pill to Asynchronous and Event based applications
AsyncPush
TriggersEvents
Reactive ExtensionsReactive Extensions
52
www.reactivex.io github.com/Reactive-Extensionswww.manning.com/dresher
Thank You
Tamir Dresher (@tamir_dresher)
53