rxjava 2 tips and tricks
TRANSCRIPT
ABOUT ME
▸ Android developer since 2008
▸ Lead Android Engineer @ 90Seconds.tv
▸ Kotlin and Rx addict
▸ RxKotlin contributor
▸ RxDataBindings author
Event Iterable Observable
retrieve data T next() onNext(T)
discover error throw Exception onError(Exception)
complete !hasNext() onCompleted()
transform fromIterbale() toList()
SHORTEST RX INTRO EVER
1.0 VS 2.0
▸ Mar 2018 - end of rxJava1 support
▸ Better performance
▸ Lower memory consumption
▸ Can’t use null
▸ Reactive-Streams based
▸ Maybe, Flowable
OBSERVABLE AND FRIENDS
▸ Flowable: 0..N flows, supporting Reactive-Streams and backpressure
▸ Observable: 0..N flows, no backpressure
▸ Single: a flow of exactly 1 item or an error
▸ Completable: a flow without items but only a completion or error signal
▸ Maybe: a flow with no items, exactly one item or an error
NULL OBJECT PATTERN
interface Animal { void makeSound(); }
class Dog implements Animal { public void makeSound() { System.out.println("woof!"); } }
enum NullAnimal implements Animal { INSTANCE; public void makeSound() {}; }
void execute() { List<Animal> animals = Arrays.asList(NullAnimal.INSTANCE, new Dog()); Observable.fromIterable(animals) .filter(animal J> animal != NullAnimal.INSTANCE) .subscribe(AnimalKLmakeSound); }
STREAM OF OPTIONAL VALUES
interface Animal { void makeSound(); }
class Dog implements Animal { public void makeSound() { System.out.println("woof!"); } }
public void execute() { List<Optional<Animal>> animals = Arrays.asList( Optional.<Animal>empty(), Optional.of(new Dog()) ); Observable.fromIterable(animals) .filter(OptionalKLisPresent) .map(OptionalKLget) .subscribe(AnimalKLmakeSound); }
STREAM OF STATES
class AnimalStateSuccess implements AnimalState { Animal animal; AnimalStateSuccess(Animal animal) { this.animal = animal; } @Override public boolean hasAnimal() { return true;} @Override public Animal animal() { return animal; } @Override public String errorMsg() { return null; } }
enum AnimalStateError implements AnimalState { INSTANCE; @Override public boolean hasAnimal() { return false;} @Override public Animal animal() { return null; } @Override public String errorMsg() { return "We lost him"; } }
public void execute() { List<AnimalState> animals = Arrays.asList( AnimalStateError.INSTANCE, new AnimalStateSuccess(new Dog()) ); Observable.fromIterable(animals) .filter(AnimalStateKLhasAnimal) .map(AnimalStateKLanimal) .subscribe(AnimalKLmakeSound); }
TESTING IS EASY
▸ Blocking calls
▸ TestSubscriber TestObserver
▸ RxJavaPlugins
▸ await()/awaitTerminalEvent()
▸ TestScheduler
BLOCKING CALLS
Observable<Integer> observable = Observable.just(1, 2);
int first = observable.blockingFirst(); assertEquals(first, 1);
int last = observable.blockingLast(); assertEquals(last, 2);
Iterable<Integer> integers = observable.blockingIterable(); int sum = 0; for (Integer integer : integers) sum += integer; assertEquals(sum, 3);
TEST SUBSCRIBER
Observable.just(1, 2) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
TEST SUBSCRIBER
Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
TEST SUBSCRIBER
Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
FAIL
TEST SUBSCRIBER
Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
AWAIT TERMINAL EVENT
boolean hasTerminalEvent = Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
assertFalse(hasTerminalEvent);
PREVENT FROM FAILING
Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
UNIT TEST RULE
@Rule final TrampolineSchedulersRule schedulers = new TrampolineSchedulersRule();
@Test public void observerTest() throws InterruptedException { Observable.timer(1, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2)) .test() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }
TEST SUBSCRIBER
public class TrampolineSchedulersRule implements TestRule { @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> Schedulers.trampoline()); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }
TEST SUBSCRIBER
public class TrampolineSchedulersRule implements TestRule { @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> Schedulers.trampoline()); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> Schedulers.trampoline()); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }
TIME CONTROL
Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));
externalObservable.test() .await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3);
TIME CONTROL
@Test public void subscriberTest() throws InterruptedException { Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));
TestObserver<Integer> testObserver = externalObservable.test();
testObserver.assertNoValues();
testObserver.await() .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }
TIME CONTROL
@Rule public TestSchedulersRule testSchedulerRule = new TestSchedulersRule(); private TestScheduler testScheduler = testSchedulerRule.testScheduler;
@Test public void subscriberTest() throws InterruptedException { Observable<Integer> externalObservable = Observable.timer(10, TimeUnit.SECONDS) .flatMap(ignore J> Observable.just(1, 2));
TestObserver<Integer> testObserver = externalObservable.test();
testObserver.assertNoValues();
testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
testObserver .assertNoErrors() .assertComplete() .assertResult(1, 2) .assertNever(3); }
TEXT
public class TestSchedulersRule implements TestRule { TestScheduler testScheduler = new TestScheduler(); @Override public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { RxJavaPlugins.setIoSchedulerHandler( scheduler J> testScheduler); RxJavaPlugins.setComputationSchedulerHandler( scheduler J> testScheduler); RxJavaPlugins.setNewThreadSchedulerHandler( scheduler J> testScheduler); try { base.evaluate(); } finally { RxJavaPlugins.reset(); } } }; } }