angular2 rxjs

69
Rxjs for angular 2 christoffer noring google developer expert

Upload: christoffer-noring

Post on 13-Jan-2017

400 views

Category:

Technology


3 download

TRANSCRIPT

Page 1: Angular2 rxjs

Rxjs for angular 2christoffer noring

google developer expert

Page 2: Angular2 rxjs

In the beginningThere was callback hell

Unreadable

Page 3: Angular2 rxjs

Angular 1 and promises

We got more ordered code with constructs like

get() .then( doSomething ) .then( somethingElse ) .then( lastAction ) .catch( catchAll )

No callback hell anymore :)

We could do stuff like first do A then based on As result do B

Page 4: Angular2 rxjs

$q.when( data )

function doSomethingAsync(){

var deferred = $q.defer(); setTimeout( () => { if(someCondition) { deferred.resolve( data ) } else { deferred.reject( err ); } },3000 ) return deferred.promise;

}

OR

We could wrap up async code in a nice way

doSomethingAsync().then((data) => { console.log(data);}, (err) => { console.log(err);})

Use this one1

2

Page 5: Angular2 rxjs

function doAsync(){new Promise( (resolve, reject) =>

{ setTimeout({ resolve(data); },3000);})

}

doAsync().then((data) => {}, err => {

})

Or we could use native promises

Promise.all( get(), getMore().. )

Page 6: Angular2 rxjs

Everything was great or?

Page 7: Angular2 rxjs

Issues with promisesNot cancellable

We need to write a lot of code to retry our logic, becomes messy

Only returns one value

Not easy to compose with other things like callbacks and event etc, although them being async in their nature

Cumbersome to retry

Page 8: Angular2 rxjs

A better way, observables

stream of values over time

1 2 3 4 5

time6

Page 9: Angular2 rxjs

Enter observablesvar stream = new Rx.Observable.create( fnSuccess,fnError,fnCompleted )

stream.subscribe((data) => { console.log( “Data”,data );})

nothing happens till someone subscribes

Page 10: Angular2 rxjs

Observable with error

var stream = Rx.Observable.create(function(observer){ observer.onNext(1); observer.onNext(2); observer.onNext(3); observer.onError( ‘there is an error’ )})

stream.subscribe( function(data){ console.log( data ); }, function(error) { console.error( error ); })

Page 11: Angular2 rxjs

var stream = Rx.Observable.create(function(observer){ observer.onNext( 1 ); observer.onNext( 2 ); observer.onNext( 3 ); observer.onCompleted();})

stream.subscribe( function(data){ console.log( data ); }, function(error){ console.error( error ); }, function(){ console.info( “completed” ); } )

Observable with completion

Page 12: Angular2 rxjs

var homemadeStream = Rx.Observable.create((observer) => { var i=0;

var handle = setInterval(function() { observer.onNext( i++ ); }, 500);

return function(){ console.log('Disposing timeout'); clearTimeout( handle ); }});

Observable with cancellation

var subscription2 = homemadeStream.subscribe((val) => { console.log('Homemade val',val);});

setTimeout(function() { console.log('Cancelling homemadeStream'); subscription2.dispose();}, 1500);

we clean upno more values are emitted

.unsubscribe()

Page 13: Angular2 rxjs

Wrapping something into an observer

var stream = Rx.Observable.create(function(observer){ var request = new XMLHttpRequest();

request.open( ‘GET’, ‘url’ ); request.onload = function(){ if(request.status === 200) { observer.onNext( request.response ); observer.onCompleted(); } else { observer.onError( new Error( request.statusText ) ) } } request.onerror = function(){ observer.onError( new Error(‘unknown error’) ); } request.send();})

1

2

3

3

stream.subscribe( (result) => { console.log( result );

} err => {}() => {})

132

Page 14: Angular2 rxjs

Everything is a stream

Page 15: Angular2 rxjs

You an combine async concepts

click eventsuser inputsdata from server

callbackssocket

Page 16: Angular2 rxjs

You can create observable by hand but is more likely to create them from

something

Page 17: Angular2 rxjs

Different ways to create an observable

Rx.Observable.fromArray([ 1,2,3,4 ])

Rx.Observable.fromEvent(element, ‘event’);Rx.Observable.fromArray(eventEmitter, ‘data’, function(){})

Rx.Observable.fromNodeCallback(fs.createFile)

Rx.Observable.fromCallback(obj.callback)

Rx.Observable.range(1,3)Rx.Observable.interval(miliseconds)

Page 18: Angular2 rxjs

Operators gives observable their power

Page 19: Angular2 rxjs

var stream = Rx.Observable.interval(500).take(5).filter((val) = > { return val % 2 === 0;}).map((val) => { return “val” + 1;})

Operators

stream.subscribe(() => {})

operator is a function that returns an observable

emit value every 500 ms

limit number of values

operator: change the value

operator: only emit certain values

Page 20: Angular2 rxjs

Wait, it looks like loadash?

Page 21: Angular2 rxjs

Comparisonlist.map( x = > x.prop ).filter( x => x > 2 ).take( 2 )

list.map( x = > x.prop ).filter( x => x > 2 ).take( 2 ).subscribe( x => console.log(x), err => console.log(err)

)service.get().then( x => console.log(x) ).catch( err => console.log(err) )

Array like, handles async

but can also- Cancelled- Retried

lodash/array functions

async

observable

Page 22: Angular2 rxjs

Operatorsoverview

120+ operators ( 60+ Rxjs5 )

Combination Conditional

Creational

Error handling

Filtering

Transformation

Utility

Categories

Page 23: Angular2 rxjs

How am I gonna keep track of all that?

Use the most common ones, and then expand your knowledge

Look at marble diagrams which one you need

Also create your own to help you in the coding process

Page 24: Angular2 rxjs

Stream

Other stream

Resulting stream

1

Most operators are covered at rxmarbles.com

2 3

4 5

1 2 3 4 5

Marble diagramsstream/s + operation = new stream

operation by the operator

Page 25: Angular2 rxjs

Start with these operators

map

filter

flatMap

switchMap

do

merge

of

interval

take

Page 26: Angular2 rxjs

ofRx.Observable.of(1,2,3,4).subscribe((val) => {console.log( val )

})

Emits exactly whats in there

Good for prototyping an idea

Page 27: Angular2 rxjs

mapRx.Observable.of(1,2,3).map((val) => { return “” + val;})

Point is to take each value, change it and return a new value in its place

Page 28: Angular2 rxjs

filterRx.Observable.of(1,2,3).filter((val) => {return val % 2 ===

0;}).

Point is to reduce the number of values

Page 29: Angular2 rxjs

flatMapRx.Observable.of(1,2,3).map( (val) => {

} ) return Rx.DOM.getJSON( ‘data’ + val +‘.json' )

Becomes a list of observables, hard to work with

Rx.Observable.of(1,2,3).flatMap( (val) => {

} )return Rx.DOM.getJSON( ‘data’ + val +‘.json' )

Flatten all observables to a meta stream

Page 30: Angular2 rxjs

flatMap - example 2, auto complete

flatmapExample = Rx.Observable.fromEvent(input,'keyup').map( function(ev){ return ev.target.value;}).filter(function(text){ return text.length >=3;}).distinctUntilChanged().flatMap( function(val){ return Rx.DOM.getJSON( 'data3.json' );})

flatmapExample.subscribe( function(result){ console.log('Flatmap', result);})

Transform event to char

Wait until we have 3 chars

Only perform search if this ‘search’ is unique

Page 31: Angular2 rxjs

flatMapIs for scenarios when you come from one type of streams and wants it to become something completely else like a stream of clicks becomes a stream of ajax calls

clicks ajax persons

Also avoid thisajax

clicks

ajax

ajax

Subscribe

Subscribe

Subscribe

Instead becomeajax

clicks

ajax

ajax

ajax personsflat mapmap

one subscribe

Page 32: Angular2 rxjs

switchMap

Switch map, complete something based on a condition

breakCondition = Rx.Observable.fromEvent(document,'click');breakCondition.switchMap( function(val){ return Rx.Observable.interval(3000).mapTo('Do this if nothing breaks me');})

breakCondition.subscribe( function(val){ console.log('Switch map', val);})

Page 33: Angular2 rxjs

dovar stream = Rx.Observable.of(1,2,3,4,5);

var subscription = stream.do(function(val){ console.log('Current val', val);}).filter(function(val){ return val % 2 === 0;});

subscription.subscribe(function(val){ console.log('Val',val);})

Echos every value without changing it,

used for logging

Page 34: Angular2 rxjs

concat/merge

concat

merge

var concat = Rx.Observable.concat( stream, stream2 );

var stream = Rx.Observable.fromArray([1,2,3,4]);

var stream2 = Rx.Observable.fromArray([5,6,7,8]);

1, 2, 3, 4, 5, 6, 7 ,8

var merge = Rx.Observable.merge( stream, stream2 );

1, 5, 2, 6, 3, 7, 4 ,8

first stream emits all values then remaining value

Needs queue Scheduler in Rxjs5

Page 35: Angular2 rxjs

Other useful operators

Page 36: Angular2 rxjs

Delay,the whole sequence

stream.delay(1000).subscribe( (val) => { var newTime = new Date().getTime(); console.log('Delayed', val + " " + (newTime - time));})

Rx.Observable.merge( Rx.Observable.of('Marco').delay(1000), Rx.Observable.of('Polo').delay(2000)).subscribe((val) => { var newTime = new Date().getTime(); console.log('Merged Delay', val + " " + (newTime - time));})

Delay….. 1 second

123

….. 1 second Marco

….. 2 secondPolo

Page 37: Angular2 rxjs

Debouncevar debounceTime = Rx.Observable.fromEvent(button,'click').debounce(2000);

debounceTime.subscribe( function(){ console.log('mouse pressed');})

Ignores all generated mouse click events

for 2 seconds

click click click click click

ignore ignore ignore ignore use

…. 2 seconds passed

Page 38: Angular2 rxjs

Fetching data with fetch api and promise

fetch(‘data.json',init).then(function(response) { return response.blob();})

var init = { method: 'GET', headers: myHeaders, mode: 'cors', cache: 'default' };

There is always this one if you don’t want to use Observables

Page 39: Angular2 rxjs

Angular 2 and rxjs

router, most actions are observables

http lib is observables

What about angular 2 and rxjs,how is it used?

Page 40: Angular2 rxjs

Fetching datawith Observable

getAll() { return http.get(`${baseUrl}`/people, {}) .map( mapPeople )}

getAll().subscribe((people) => { console.log(‘people’, people);}).catch((err) => { console.log(‘Error’, err);})

mapPeople(response:Response) { return response.json().results.map(toPerson)}

fetch data, map result, handle error

Page 41: Angular2 rxjs

Fetch data Observable

cancellingvar stream = getAll() { return http.get(`${baseUrl}`/people, {}) .map( mapPeople )}

setTimeout(() => { stream.dispose();},3000);

stream.subscribe((data) => {})

unsubscribe() in rxjs5

BUT we can cancel!!

Page 42: Angular2 rxjs

Fetch data Observable

retry

var stream = Rx.DOM.get('/products.json').retry(5);

stream.subscribe((val) => { console.log('Data', val);}, err => console.log(err));

5 failed attempts then we hit error callbackIts so easy to retry, imagine how messy this code would be with a promise

Page 43: Angular2 rxjs

SubjectObserver Observable

Subject

Inherits from both

the subject can act as a proxy for a group of subscribers and a source

Page 44: Angular2 rxjs

var subject = new Rx.Subject();

subject.subscribe((val) => { console.log( 'Produced by subject', val );});

subject.onNext(1);subject.onCompleted();

Acts like an observer

Acts like an observableSubject

Page 45: Angular2 rxjs

var subject = new Rx.Subject();

var source = Rx.Observable.interval(500);

source.subscribe(subject);

subject.subscribe( (val) => { console.log('Sub', val); }, (err) => console.log(err), () => console.log('completed'));

setTimeout(function() { subject.onCompleted();}, 3000);

Pass subject as an observer

Receives all the values pushed out by the source

Able to stop receiving values

Subject

Page 46: Angular2 rxjs

var subject = new Rx.Subject();

var source = Rx.Observable.interval(500).take(3);

source.subscribe( subject );

subject.subscribe((val) => { console.log('Subject', val);});

subject.onNext('Mess1');subject.onNext('Mess2');

setTimeout(function() { subject.onCompleted();}, 1600);

Listens to all values from source

Add to stream

Order important

SubjectonNext() before subscribe is lost

Page 47: Angular2 rxjs

var subject = Rx.ReplaySubject();subject.onNext(1);

subject.subscribe((val) = > { console.log('Replay', val);})

subject.onNext(2);subject.onNext(3);

var subject = Rx.Subject();subject.onNext(1);

subject.subscribe((val) = > { console.log('Replay', val);})

subject.onNext(2);subject.onNext(3);

Normal subject, everything before subscribe is lost

Replay subject, nothing is lost

Subject - types

Page 48: Angular2 rxjs

BehaviourSubject

Good for default values

/* Initialize with initial value of 42 */var subject = new Rx.BehaviorSubject(42);

var subscription = subject.subscribe( function (x) { console.log('Next: ' + x.toString()); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });

subject.onNext(56);

subject.onCompleted();// => Completed

Init/Default value

Next: 42

Next: 56

Page 49: Angular2 rxjs

Angular 2 with rxjsRxjs is a big library, if you just have request/reponse then do

import 'rxjs/add/operator/map';One import per used operator

import * as Rx from 'rxjs/Rx';imports the whole of Rxjs !!!, GOOD for development BAD for production Then carry on coding

1

1a

Page 50: Angular2 rxjs

Component to ComponentYou want something that can produce value,

that you can listen to

this.subject = new Rx.Subject();

function sendData(data){ this.subject.next( data )}

function getSubject(){ return this.subject;}

//service implComponent

1

Component1

service.getSubject().subscribe(()=>{})

service.sendData( data )

BUT, can be made nicer with a uniform data flow

Page 51: Angular2 rxjs

Angular 2 rxjs setup

ngOnInit(){ this.subscription = this.service.getData();}

ngOnDestroy(){ this.subscription.unsubscribe();}

Setup

Teardown

life cycle hooks

Page 52: Angular2 rxjs

Angular 2So when to use it?

Its a better promiseCancelling

Retry

Want to combine callbacks, event and data fetch?Less code with rxjs

there is a forkJoin operator that replace Promise.all() etc

anything you can do, I can

do betterno you can’t :(

Observable Promise

Page 53: Angular2 rxjs

Testing

Solution : bending time with schedulers

Hard to test right?

Some methods might take minutes?

Page 54: Angular2 rxjs

Schedulersbending time

Page 55: Angular2 rxjs

Because scheduler has its own virtual clockAnything scheduled on that scheduler will adhere to time denoted on the clock

I.e we can bend time for ex unit testing

Page 56: Angular2 rxjs

var onNext = Rx.ReactiveTest.onNext;var scheduler = new Rx.TestScheduler();var subject = scheduler.createColdObservable( onNext(100,'first'), onNext(200,'second'));

var result;

subject.subscribe((val) => { result = val;});

scheduler.advanceBy( 100 );

console.log('Should equal', result === 'first');

scheduler.advanceBy( 100 );

console.log('Should equal', result === 'second');

Advance timeAssert

Advance timeAssert

create observable from scheduler

emit values

Page 57: Angular2 rxjs

var testScheduler = new Rx.TestScheduler();

var stream = Rx.Observable.interval(1000, testScheduler).take(5).map((val) => { return val + 1}).filter((i) => { return i % 2 === 0});

var result;stream.subscribe((val) => result = val );

console.log('testing function’);

testScheduler.advanceBy(1000);testScheduler.advanceBy(1000);testScheduler.advanceBy(1000);

console.log('Should equal', result === 2);

testScheduler.advanceBy(1000);testScheduler.advanceBy(1000);console.log('Should equal', result === 4);

replace default scheduler

0 1 2

Page 58: Angular2 rxjs

Error scenarios

Capture error in .subscribe()

Page 59: Angular2 rxjs

Error handlingcatchvar errStream = Rx.Observable.throw('Error');

var stream = Rx.Observable.create(function(observer){ observer.onNext(1); })

var merged = Rx.Observable.merge( errStream, stream )

merged.subscribe( function(val){ console.log('Val', val);}, function(err){ console.log('Err', err);}, function(){ console.log('completed');})

Captured here but sequence interrupted,completed NOT reached

Page 60: Angular2 rxjs

Capture error in .subscribe() + completed stream

Error handling

Page 61: Angular2 rxjs

Error handlingimproved catch

var errStream = Rx.Observable.throw('Error');

var stream = Rx.Observable.create(function(observer){ observer.onNext(1); })

var merged = Rx.Observable.merge( errStream, stream ).catch(function(err){ return Rx.Observable.of(err);});

merged.subscribe( function(val){ console.log('Val', val);}, function(err){ console.log('Err', err);}, function(){ console.log('completed');})

Captured here but sequence interrupted,completed IS reached

stream not processed thoughcan we improve it?

stream killed by errStream:(

Page 62: Angular2 rxjs

Wrap error stream before merging with other streams so we don’t kill other valid streams

Error handling

Page 63: Angular2 rxjs

Error handlingignoring

We need to handle the erroring stream betterFrom

var errStream = Rx.Observable.throw('AAA thump..')

Tovar errStream = Rx.Observable.throw('AAA thump..').catch(function(err){ return Rx.Observable.of(err);});

This will emit all values and the wrapped error

Page 64: Angular2 rxjs

Process all values and errors by wrapping the errors,everything gets processed

Error handling

Page 65: Angular2 rxjs

Error - ignoringother scenario

var merged = Rx.Observable.merge( errStream2, stream )

merged.subscribe( function(val){ console.log('Val', val);}, function(err){ console.log('Err', err);}, function(){ console.log('completed');})

1,1,2

var errStream2 = Rx.Observable.interval(200).take(3).select(function(val){ if(val === 0) { return Rx.Observable.throw('Error stream'); } else { return Rx.Observable.of(val); }}).select(function(observable){ return observable.catch(Rx.Observable.return('Error handled'));}).selectMany( function(x){ return x;});

wrap

catch and rewrite

Page 66: Angular2 rxjs

Set a policy for error handling of streams when merging so successful stream survive

Error handling

Page 67: Angular2 rxjs

Error - ignoringother scenario

You have several sources and two terminatesYou want the other normal source to work

var errStream = Rx.Observable.throw('AAA thump..');

var errStreamWithFlattenedData = Rx.Observable.interval(500).take(3).flatMap( function(val){ if( val === 1 ) { return Rx.Observable.throw('crashing'); } else { return Rx.Observable.return(val); }})

var normalStream = Rx.Observable.return('anything');

var handledErrorStream = Rx.Observable.onErrorResumeNext( errStream, normalStream, errStreamWithFlattenedData );handledErrorStream.subscribe(function(err){ console.log('error stream ignored', err);},function(err){ console.log("error stream ignored, error",err);}, function(){ console.log("completion of error stream ignored");})

Ensure good streams survive in a merge of failing streams, onErrorResumeNext

Page 68: Angular2 rxjs

Further reading

https://xgrommx.github.io/rx-book

http://www.learnrxjs.io/

bacon.js

Page 69: Angular2 rxjs

Thank you