building responsive application with rx - confoo - tamir dresher

51
1 Tamir Dresher (@tamir_dresher) Senior Software Architect J Building Responsive Applications with Rx 1

Upload: tamir-dresher

Post on 12-Apr-2017

73 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Building responsive application with Rx - confoo - tamir dresher

1

Tamir Dresher (@tamir_dresher)Senior Software ArchitectJ

Building Responsive Applications with Rx

1

Page 2: Building responsive application with Rx - confoo - tamir dresher

2

• Author of Rx.NET in Action (manning publications)• Software architect, consultant and instructor• Software Engineering Lecturer @ Ruppin Academic Center• OzCode (www.oz-code.com) Evangelist

@[email protected]://www.TamirDresher.com.

About Me

Page 3: Building responsive application with Rx - confoo - tamir dresher

Making our users happy

3

Page 4: Building responsive application with Rx - confoo - tamir dresher

Making our users NOT happy

4

Page 5: Building responsive application with Rx - confoo - tamir dresher

5

Happiness

Responsiveness

Complexity

Asynchroncity

Headache

Page 6: Building responsive application with Rx - confoo - tamir dresher

Reactive Extensions (Rx)

Your headache relief pill to Asynchronous Event based applications

AsyncPush

TriggersEvents

6

Page 7: Building responsive application with Rx - confoo - tamir dresher

Reacting to changes

7

Page 8: Building responsive application with Rx - confoo - tamir dresher

IEnumerable<Message> LoadMessages(string hashtag){

var statuses = facebook.Search(hashtag);var tweets = twitter.Search(hashtag);var updates = linkedin.Search(hashtag);return statuses.Concat(tweets).Concat(updates);

}

Twitter App

Linkedin

Facebook

Pull ModelPull Model

8

Page 9: Building responsive application with Rx - confoo - tamir dresher

???? LoadMessages(string hashtag){

facebook.Search(hashtag);twitter.Search(hashtag);linkedin.Search(hashtag);

}

DoSomething( )msg

Push ModelPush Model

9

Page 10: Building responsive application with Rx - confoo - tamir dresher

Push model with Events

10

var mgr = new SocialNetworksManager();mgr.MessageAvailable += (sender, msg) => {//do something};mgr.LoadMessages("Rx");

class SocialNetworksManager{ //members

public event EventHandler<Message> MessageAvailable; public void LoadMessages(string hashtag) { var statuses = _facebook.Search(hashtag); NotifyMessages(statuses); : }}

hard to unsubscribe non-composable

can’t send as argument

no isolation

Page 11: Building responsive application with Rx - confoo - tamir dresher

observable observer

Subscribe(observer)

subscription

OnNext(X1)

OnNext(Xn)⁞

X1observable X2 Xn

IDisposable

Observables and ObserversObservables and Observers

12

Page 12: Building responsive application with Rx - confoo - tamir dresher

OnCompleted()

OnError(Exception)

observable

observable observer

X1 X2 Xn

Observables and ObserversObservables and Observers

13

Page 13: Building responsive application with Rx - confoo - tamir dresher

namespace System{ public interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }

public interface IObserver<in T> { void OnNext(T value); void OnError(Exception error); void OnCompleted(); }}

InterfacesInterfaces

14

Page 14: Building responsive application with Rx - confoo - tamir dresher

Rx packages

15

Page 15: Building responsive application with Rx - confoo - tamir dresher

Push ModelPush Model with Rx Observables

class ReactiveSocialNetworksManager{ //members public IObservable<Message> ObserveMessages(string hashtag) {

: }}

var mgr = new ReactiveSocialNetworksManager();

mgr.ObserveMessages("Rx")    .Subscribe(        msg => Console.WriteLine($"Observed:{msg} \t"),        ex => { /*OnError*/ },        () => { /*OnCompleted*/ });

16

Page 16: Building responsive application with Rx - confoo - tamir dresher

Creating Observables

17

Page 17: Building responsive application with Rx - confoo - tamir dresher

IObservable<Message> ObserveMessages(string hashtag){

}

return Observable.Create( async (o) => {

});

var messages = await GetMessagesAsync(hashtag);foreach(var m in messages){ o.OnNext(m);}o.OnCompleted();

Pull To PushPull To Push

Subscribe function

FacebookClient:    

18

Page 18: Building responsive application with Rx - confoo - tamir dresher

IObservable<Message> ObserveMessages(string hashtag){

}

var messages = await GetMessagesAsync(hashtag);return messages.ToObservable();

Built-in ConvertersBuilt-in Converters

Note: All observers will get the same emitted messages

19

Page 19: Building responsive application with Rx - confoo - tamir dresher

IObservable<Message> ObserveMessages(string hashtag){

}

return Observable.Defer( async () => {

});

var messages = await GetMessagesAsync(hashtag);return messages.ToObservable();

Built-in ConvertersDeferring the observable creation

Note: Every observer will get the “fresh” emitted messages

20

Page 20: Building responsive application with Rx - confoo - tamir dresher

Observable.Merge(

facebook.ObserveMessages(hashtag),

twitter.ObserveMessages(hashtag),

linkedin.ObserveMessages(hashtag));

Built-in OperatorsBuilt-in Combinators (combining operators)

21

Page 21: Building responsive application with Rx - confoo - tamir dresher

Observable.Range(1, 10)    .Subscribe(x => Console.WriteLine(x));

Observable.Interval(TimeSpan.FromSeconds(1))    .Subscribe(x => Console.WriteLine(x));

Observable.FromEventPattern(SearchBox, "TextChanged")

1 2 10

1 2 10

⁞1 sec 1 sec

Observables Factories

Observables Factories

22

Page 22: Building responsive application with Rx - confoo - tamir dresher

Observable Queries (Rx Operators)

23

Page 23: Building responsive application with Rx - confoo - tamir dresher

Filtering Projection Partitioning Joins Grouping Set

Element Generation Quantifiers Aggregation Error HandlingTime and Concurrency

Where

OfType

Select

SelectMany

Materialize

Skip

Take

TakeUntil

CombineLatest

Concat

join

GroupBy

GroupByUntil

Buffer

Distinct

DistinctUntilChanged

Timeout

TimeInterval

ElementAt

First

Single

Range

Repeat

Defer

All

Any

Contains

Sum

Average

Scan

Catch

OnErrorResumeNext

Using

Rx operators

24

Page 24: Building responsive application with Rx - confoo - tamir dresher

Operator2 Creates an IObservable<T3>

IObservable<T2> Operator<T1>(IObservable<T1> source, parameters);

Operator 1 Subscribe( )

observer

observable… Operator

N

Operator1 Creates an IObservable<T2>

IObservable<T>Original source

observerSubscribe

ComposabilityComposability

25

Page 25: Building responsive application with Rx - confoo - tamir dresher

g(x)

f(x)

Observable Query PipelineObservable Query Pipeline

26

Page 26: Building responsive application with Rx - confoo - tamir dresher

Reactive SearchReactive Search

27

Page 27: Building responsive application with Rx - confoo - tamir dresher

1. At least 3 characters2. Don’t overflow server (0.5 sec delay)3. Don’t send the same string again4. Discard results if another search was requested

Reactive Search - RulesReactive Search - Rules

28

Page 28: Building responsive application with Rx - confoo - tamir dresher

Demo

Click icon to add picture

29

Page 29: Building responsive application with Rx - confoo - tamir dresher

30

R RE REACREA REA REAC

Page 30: Building responsive application with Rx - confoo - tamir dresher

31

R RE REACREA

REACREA

REA REAC

REA REAC

Where(s => s.Length > 2 )

Page 31: Building responsive application with Rx - confoo - tamir dresher

32

R RE REACREA

REACREA

REA REAC

REA REAC

Where(s => s.Length > 2 )

Throttle(TimeSpan.FromSeconds(0.5))

REACREA REAC

Page 32: Building responsive application with Rx - confoo - tamir dresher

33

R RE REACREA

REACREA

REA REAC

REA REAC

Where(s => s.Length > 2 )

Throttle(TimeSpan.FromSeconds(0.5))

REACREA REAC

DistinctUntilChanged()

REACREA

Page 33: Building responsive application with Rx - confoo - tamir dresher

35

REACREA

Select(text => SearchAsync(text))

“REA”

“REAC”

Switch()

“REA” results are ignored since we switched to the “REAC” results

Page 34: Building responsive application with Rx - confoo - tamir dresher

Rx Queries FTW!Building a Stock Ticker with zero effort

36

Page 35: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

from tick in ticks

37

Page 36: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

27.01 27.96 31.21 30.73

MSFT

21.75 22.54 20.98

INTC

from tick in ticksgroup tick by tick.Symbol into company

38

Page 37: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

MSFT

INTC

from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)

[27.01, 27.96] [27.96, 31.21] [31.21, 30.73]

[21.75, 22.54] [22.54, 20.98]

39

Page 38: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

MSFT

INTC

from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]

0.034 0.104 -0.015

0.036 -0.069

40

Page 39: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

MSFT

INTC

from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]where diff > 0.1

0.034 0.104 -0.015

0.036 -0.069

41

Page 40: Building responsive application with Rx - confoo - tamir dresher

Stock Trade Analysis

MSFT27.01

ticks

INTC21.75

MSFT27.96

MSFT31.21

INTC22.54

INTC20.98

MSFT30.73

from tick in ticksgroup tick by tick.Symbol into companyfrom openClose in company.Buffer(2, 1)let diff = (openClose[1] – openClose[0]) / openClose[0]where diff > 0.1select new { Company = company.Key, Increase = diff }

res

Company = MSFTIncrease = 0.104

42

Page 41: Building responsive application with Rx - confoo - tamir dresher

Abstracting Time and Concurrency

43

Page 42: Building responsive application with Rx - confoo - tamir dresher

What will be the output?

The Repeat operator resubscribes the observer when the observable completes

Most developers falsely believe that the program will immediately dispose the subscription and nothing will be written to the consoleRx is not concurrent unless it explicitly told to

The example above writes the sequence 1-5 indefinitely to the console

44

var subscription = Observable.Range(1, 5) .Repeat() .Subscribe(x => Console.WriteLine(x));

subscription.Dispose();

Page 43: Building responsive application with Rx - confoo - tamir dresher

Thread Pool

Task Scheduler

other

SchedulersSchedulers

45

Page 44: Building responsive application with Rx - confoo - tamir dresher

public interface IScheduler{ DateTimeOffset Now { get; }

IDisposable Schedule<TState>( TState state, Func<IScheduler, TState, IDisposable> action);

IDisposable Schedule<TState>(TimeSpan dueTime, TState state,Func<IScheduler, TState, IDisposable> action);

IDisposable Schedule<TState>(DateTimeOffset dueTime,

TState state,Func<IScheduler, TState, IDisposable> action);

}46

Page 45: Building responsive application with Rx - confoo - tamir dresher

//// Runs a timer on the default scheduler//IObservable<T> Throttle<T>(TimeSpan dueTime);

//// Every operator that introduces concurrency// has an overload with an IScheduler//IObservable<T> Throttle<T>(TimeSpan dueTime, IScheduler scheduler);

Parameterizing ConcurrencyParameterizing Concurrency

47

Page 46: Building responsive application with Rx - confoo - tamir dresher

textChanged.Throttle(TimeSpan.FromSeconds(0.5),

DefaultScheduler.Instance).DistinctUntilChanged().SelectMany(text => SearchAsync(text)).Switch().Subscribe(/*handle the results*/);

48

Page 47: Building responsive application with Rx - confoo - tamir dresher

//// runs the observer callbacks on the specified // scheduler.//IObservable<T> ObserveOn<T>(IScheduler);

//// runs the observer subscription and unsubsciption on// the specified scheduler.//IObservable<T> SubscribeOn<T>(IScheduler)

Changing Execution ContextChanging Execution Context

49

Page 48: Building responsive application with Rx - confoo - tamir dresher

textChanged.Throttle(TimeSpan.FromSeconds(0.5)).DistinctUntilChanged().Select(text => SearchAsync(text)).Switch()

.ObserveOn(DispatcherScheduler.Current).Subscribe(/*handle the results*/);

Changing Execution ContextChanging Execution Context

50

Page 49: Building responsive application with Rx - confoo - tamir dresher

textChanged.Throttle(TimeSpan.FromSeconds(0.5)).DistinctUntilChanged().Select(text => SearchAsync(text)).Switch().ObserveOnDispatcher().Subscribe(/*handle the results*/);

Changing Execution ContextChanging Execution Context

51

Page 50: Building responsive application with Rx - confoo - tamir dresher

Your headache relief pill to Asynchronous and Event based applications

AsyncPush

TriggersEvents

Reactive ExtensionsReactive Extensions

52

Page 51: Building responsive application with Rx - confoo - tamir dresher

www.reactivex.io github.com/Reactive-Extensionswww.manning.com/dresher

Thank You

Tamir Dresher (@tamir_dresher)

53