rxjava 2 tips and tricks

25
RXJAVA2 TIPS AND TRICKS @STEPANGO

Upload: stepan-goncharov

Post on 23-Jan-2018

426 views

Category:

Engineering


0 download

TRANSCRIPT

RXJAVA2 TIPS AND TRICKS

@STEPANGO

ABOUT ME

▸ Android developer since 2008

▸ Lead Android Engineer @ 90Seconds.tv

▸ Kotlin and Rx addict

▸ RxKotlin contributor

▸ RxDataBindings author

Event Iterable Observable

retrieve data T next() onNext(T)

discover error throw Exception onError(Exception)

complete !hasNext() onCompleted()

transform fromIterbale() toList()

SHORTEST RX INTRO EVER

1.0 VS 2.0

▸ Mar 2018 - end of rxJava1 support

▸ Better performance

▸ Lower memory consumption

▸ Can’t use null

▸ Reactive-Streams based

▸ Maybe, Flowable

OBSERVABLE AND FRIENDS

▸ Flowable: 0..N flows, supporting Reactive-Streams and backpressure

▸ Observable: 0..N flows, no backpressure

▸ Single: a flow of exactly 1 item or an error

▸ Completable: a flow without items but only a completion or error signal

▸ Maybe: a flow with no items, exactly one item or an error

HOW TO DEAL WITH NULLS

▸ Null object pattern

▸ Optional

▸ State wrappers

NULL OBJECT PATTERN

interface Animal { void makeSound(); }

class Dog implements Animal { public void makeSound() { System.out.println("woof!"); } }

enum NullAnimal implements Animal { INSTANCE; public void makeSound() {}; }

void execute() { List<Animal> animals = Arrays.asList(NullAnimal.INSTANCE, new Dog()); Observable.fromIterable(animals) .filter(animal J> animal != NullAnimal.INSTANCE) .subscribe(AnimalKLmakeSound); }

STREAM OF OPTIONAL VALUES

interface Animal { void makeSound(); }

class Dog implements Animal { public void makeSound() { System.out.println("woof!"); } }

public void execute() { List<Optional<Animal>> animals = Arrays.asList( Optional.<Animal>empty(), Optional.of(new Dog()) ); Observable.fromIterable(animals) .filter(OptionalKLisPresent) .map(OptionalKLget) .subscribe(AnimalKLmakeSound); }

STREAM OF STATES

class AnimalStateSuccess implements AnimalState { Animal animal; AnimalStateSuccess(Animal animal) { this.animal = animal; } @Override public boolean hasAnimal() { return true;} @Override public Animal animal() { return animal; } @Override public String errorMsg() { return null; } }

enum AnimalStateError implements AnimalState { INSTANCE; @Override public boolean hasAnimal() { return false;} @Override public Animal animal() { return null; } @Override public String errorMsg() { return "We lost him"; } }

public void execute() { List<AnimalState> animals = Arrays.asList( AnimalStateError.INSTANCE, new AnimalStateSuccess(new Dog()) ); Observable.fromIterable(animals) .filter(AnimalStateKLhasAnimal) .map(AnimalStateKLanimal) .subscribe(AnimalKLmakeSound); }

TESTING IS EASY

▸ Blocking calls

▸ TestSubscriber TestObserver

▸ RxJavaPlugins

▸ await()/awaitTerminalEvent()

▸ TestScheduler

BLOCKING CALLS

Observable<Integer> observable = Observable.just(1, 2);

int first = observable.blockingFirst(); assertEquals(first, 1);

int last = observable.blockingLast(); assertEquals(last, 2);

Iterable<Integer> integers = observable.blockingIterable(); int sum = 0; for (Integer integer : integers) sum += integer; assertEquals(sum, 3);

TEST SUBSCRIBER

Observable.just(1, 2) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

TEST SUBSCRIBER

Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

TEST SUBSCRIBER

Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

FAIL

TEST SUBSCRIBER

Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

AWAIT TERMINAL EVENT

boolean hasTerminalEvent = Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .awaitTerminalEvent(500, TimeUnit.MILLISECONDS);

assertFalse(hasTerminalEvent);

PREVENT FROM FAILING

Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

UNIT TEST RULE

@Rule final TrampolineSchedulersRule schedulers = new TrampolineSchedulersRule();

@Test public void observerTest() throws InterruptedException { Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }

TEST SUBSCRIBER

public class TrampolineSchedulersRule implements TestRule { @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> Schedulers.trampoline()); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }

TEST SUBSCRIBER

public class TrampolineSchedulersRule implements TestRule { @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> Schedulers.trampoline()); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }

TIME CONTROL

Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));

externalObservable.test() .await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);

TIME CONTROL

@Test public void subscriberTest() throws InterruptedException { Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));

TestObserver<Integer> testObserver = externalObservable.test();

testObserver.assertNoValues();

testObserver.await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }

TIME CONTROL

@Rule public TestSchedulersRule testSchedulerRule = new TestSchedulersRule(); private TestScheduler testScheduler = testSchedulerRule.testScheduler;

@Test public void subscriberTest() throws InterruptedException { Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));

TestObserver<Integer> testObserver = externalObservable.test();

testObserver.assertNoValues();

testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);

testObserver .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }

TEXT

public class TestSchedulersRule implements TestRule { TestScheduler testScheduler = new TestScheduler(); @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> testScheduler); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> testScheduler); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> testScheduler); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }

THANKS@STEPANGO

90seconds.tv