landi migration workshop traini… · one more example to control thread - traditionally landi apos...
TRANSCRIPT
![Page 1: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/1.jpg)
RxJava Introduction
JUNE 2017
FUZHOU, CHINA
![Page 2: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/2.jpg)
Why RxJava
![Page 3: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/3.jpg)
3
Show all png image to UI - Traditional
Landi APOS A8 training – June 2017
new Thread() {@Overridepublic void run() {
super.run();for (File folder : folders) {
File[] files = folder.listFiles();for (File file : files) {
if (file.getName().endsWith(".png")) {final Bitmap bitmap = getBitmapFromFile(file);getActivity().runOnUiThread(new Runnable() {
@Overridepublic void run() {
imageCollectorView.addImage(bitmap);}
});}
}}
}}.start();
Logic Nesting
![Page 4: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/4.jpg)
4
Show all png image to UI - RxJava
Landi APOS A8 training – June 2017
Observable.fromArray(folders)
.flatMap(file -> Observable.fromArray(file.listFiles()))
.filter(file -> file.getName().endsWith(".png"))
.map(file -> getBitmapFromFile(file))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Consumer<Bitmap>) bitmap -> imageCollectorView.addImage(bitmap));
![Page 5: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/5.jpg)
5
Another example to avoid callback hell - Callback
Landi APOS A8 training – June 2017
private void downloadParameters() {EmvParameterDownloader emvParameterDownloader = new EmvParameterDownloader(context);
new TerminalParameterDownloader().download(new Runnable(){@Overridepublic void run() {
emvParameterDownloader.downloadICParameters(new Runnable(){@Overridepublic void run() {
emvParameterDownloader.downloadPublicKeys();}
});}
});}
Callback Hell
![Page 6: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/6.jpg)
6
Another example to avoid callback hell - RxJava
Landi APOS A8 training – June 2017
private Completable downloadParameters() {EmvParameterDownloader emvParameterDownloader = new EmvParameterDownloader(context);new TerminalParameterDownloader(context).download())
.andThen(emvParameterDownloader.downloadICParameters())
.andThen(emvParameterDownloader.downloadPublicKeys());}
![Page 7: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/7.jpg)
7
One more example to control thread - Traditionally
Landi APOS A8 training – June 2017
private void login() {// find userreturn findUser(name, password, new Runnable(){
@Overridepublic void run() {
updateLastLoginUser(user);
runOnUiThread(new Runnable() {@Overridepublic void run() {
user -> handleLogonUser(user);}
});}
});}
![Page 8: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/8.jpg)
8
One more example to control thread - RxJava
Landi APOS A8 training – June 2017
private Completable login() {// find userreturn findUser(name, password)
// update last login user.doOnSuccess(user -> updateLastLoginUser(user))
.observeOn(AndroidSchedulers.mainThread())
.flatMapCompletable(user -> handleLogonUser(user));}
![Page 9: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/9.jpg)
9
One more example to handle error - Traditional
Landi APOS A8 training – June 2017
private Handler handler = new Handler(Looper.getMainLooper()){@Overridepublic void handleMessage(Message msg) {
if(msg.what == DOWNLOAD_IMG){Exception data = (Exception)msg.obj;// show exception
}}
};
public void signIn() {new Thread(new Runnable() {
@Overridepublic void run() {
try {// business logic
}catch (Exception e){Message message = Message.obtain();message.obj = e;message.what = ERROR;
handler.sendMessage(message);}}
}).start();}
![Page 10: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/10.jpg)
10
One more example to handle error - RxJava
Landi APOS A8 training – August 2017
public void signIn() {new SignIn(context).signIn()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableCompletableObserver() {@Overridepublic void onComplete() {
}
@Overridepublic void onError(@NonNull Throwable e) {
// show error}
});}
![Page 11: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/11.jpg)
Process
STEP 04STEP 02
STEP 03 STEP 05STEP 01
How we use in Arke ProjectIntroduce ReactiveX and
RxJava
RxJava Topics
Observable/Observer/Schedue
rler/Error Handling
Learning Path Lambda Expression
![Page 12: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/12.jpg)
12
ReactiveXAn asynchronous and event-based programs solution.
Functional Less is more Async error handling Concurrency made easy
Landi APOS A8 training – June 2017
![Page 13: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/13.jpg)
13
CROSS-PLATFORM
Java: RxJava
JavaScript: RxJS
C#: Rx.NET
C#(Unity): UniRx
Scala: RxScala
Clojure: RxClojure
C++: RxCpp
Lua: RxLua
Ruby: Rx.rb
Python: RxPY
Go: RxGo
Groovy: RxGroovy
JRuby: RxJRuby
Kotlin: RxKotlin
Swift: RxSwift
PHP: RxPHP
Elixir: reaxive
Dart: RxDart
RxNetty RxAndroid RxCocoa
Landi APOS A8 training – June 2017
![Page 14: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/14.jpg)
14
RxJava and RxAndroid
AsyncTask
AsyncTaskLoaderRxJavaThread and Handler
Introduction to RxJava for Android Developers
Intro to RxJavaWhat's different in 2.0
Landi APOS A8 training – June 2017
Why and how to replace Thread and AsyncTask
![Page 15: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/15.jpg)
15
Learning Path & Resouces
Examples
- training / RxJava2-Android-Samples
- kaushikgopal/RxJava-Android-
Samples
- How we use in Arke Project.
Pratice and Google
- Practice in Project
- Google and Thinking
- ReactiveX Official Site
- ReactiveX GitHub
- RxJava GitHub
- RxJava Category in my Blog
- Exploring RxJava 2 for Android
Jake Wharton
- Intro to RxJava for Android
- Intro to RxJava
Video
Guide by Arke Team
Reactive Programming with RxJava
Official Site
Landi APOS A8 training – June 2017
![Page 16: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/16.jpg)
16
Shut up, show me the code
Landi APOS A8 training – June 2017
Observable.fromArray(folders)
.flatMap(file -> Observable.fromArray(file.listFiles()))
.filter(file -> file.getName().endsWith(".png"))
.map(file -> getBitmapFromFile(file))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Consumer<Bitmap>) bitmap -> imageCollectorView.addImage(bitmap));
![Page 17: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/17.jpg)
17
Primer on RxJava
Landi APOS A8 training – June 2017
// include RxJava librarycompile 'io.reactivex.rxjava2:rxjava:2.0.6'
// include RxAndroid librarycompile 'io.reactivex.rxjava2:rxandroid:2.0.1'
![Page 18: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/18.jpg)
Key Concepts
30
S c h e d u l e r
O b s e r v a b l eO b s e r v e r
/ S u b s c r i b e r
O p e r a t o r
Concepts
![Page 19: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/19.jpg)
Observable
- How to create
- How to subscribe
- How to unsubscribe
- Subject
![Page 20: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/20.jpg)
20
Observable event types
Landi APOS A8 training – June 2017
- OnNext
- onComplete
- onError
OnNext* (OnComplet | OnError)?
![Page 21: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/21.jpg)
21
How to emit events
Landi APOS A8 training – June 2017
Observable<Integer> ints = Observable.create(new ObservableOnSubscribe(){
@Overridepublic void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(5);emitter.onNext(6);emitter.onNext(7);emitter.onComplet();
}});
![Page 22: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/22.jpg)
22
Hot and Cold Observables
Landi APOS A8 training – June 2017
No subscriber, no events emit
Cold
Observable
Event no subscriber, always emit
events
Hot Observable
![Page 23: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/23.jpg)
23
Reactive Types
Landi APOS A8 training – June 2017
1
Single
0
Completable
0,1
Maybe
0..n
Flowable
0..n
Observable
CompletableObserverExampleActivity.javaSingleObserverExampleActivity.java
MaybeObserverExampleActivity.java
FlowableExampleActivity.java
![Page 24: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/24.jpg)
24
Backpressure
Landi APOS A8 training – August 2017
Backpressure No Backpressure
Flowable Observable
Maybe
Single
Completable
When to use Flowable When to use Observable
10k+ events > 1k events
pull-based push-based
![Page 25: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/25.jpg)
25
Reactive Types Transformation
Landi APOS A8 training – June 2017
// Completable to FlowableCompletable.complete().toFlowable();
// Completable to MaybeCompletable.complete().toMaybe();
// Completable to ObservableCompletable.complete().toObservable();
// Completable to SingleCompletable.complete().toSingleDefault(1);
![Page 26: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/26.jpg)
26
Create Observable
Landi APOS A8 training – June 2017
// convert an object or a set of objects into an Observable that emits that or those objectsObservable.just("one");Observable.just(1, 2);
// convert some other object or data structure into an ObservableObservable.fromArray(1, 2, 3);Observable.fromCallable();Observable.fromFuture();
// create an Observable that emits a sequence of integers spaced by a particular time intervalObservable.interval(1, TimeUnit.SECONDS);
// create an Observable that emits a particular range of sequential integersObservable.range(1, 10);
// create Observables that have very precise and limited behaviorObservable.empty();Observable.error();Observable.never();
![Page 27: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/27.jpg)
27
Create Observable with Observable.create()
Landi APOS A8 training – June 2017
Observable<Integer> ints = Observable.create(new ObservableOnSubscribe(){
@Overridepublic void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(5);emitter.onNext(6);emitter.onNext(7);emitter.onComplet();
}});
![Page 28: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/28.jpg)
28
Maybe, Completable, Single, Flowable Creation
Landi APOS A8 training – June 2017
// create Completables that have very precise and limited behavior Completable.complete();Completable.error();Completable.never();Completable.timer();
// convert some other object or data structure into an ObservableCompletable.fromAction();Completable.fromCallable();Completable.fromFuture();Completable.fromRunnable();
// convert some other reactive types into an ObservableCompletable.fromObservable();Completable.fromPublisher();Completable.fromSingle();
Completable.create();
![Page 29: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/29.jpg)
29
Subscribing Observable
Landi APOS A8 training – June 2017
// Subscribe with ObserverObservable.just("one").subscribe(new Observer<String>() {
@Overridepublic void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "Subscribed");}
@Overridepublic void onNext(@NonNull String s) {
Log.d(TAG, "Got events " + s);}
@Overridepublic void onError(@NonNull Throwable e) {
Log.d(TAG, "Error happened");}
@Overridepublic void onComplete() {
Log.d(TAG, "Completed");}
});
![Page 30: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/30.jpg)
30
Simple Subscribing Observable
Landi APOS A8 training – June 2017
Observable.just("one").subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {
Log.d(TAG, s);}
});
OnNext
Observable.just("one").subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {
Log.d(TAG, s);}
}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, throwable.getMessage());}
});
onNext
onError
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)Full
![Page 31: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/31.jpg)
31
Types of Observer
Landi APOS A8 training – June 2017
![Page 32: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/32.jpg)
32
Unsubscribing from Observable
Landi APOS A8 training – June 2017
Observable.just("one", "two").subscribe(new DisposableObserver<String>() {@Overridepublic void onNext(String s) {
if(s.equals("two")){dispose();
}}
@Overridepublic void onError(Throwable e) {
dispose();}
@Overridepublic void onComplete() {
dispose();}
});
![Page 33: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/33.jpg)
33
Subject
Landi APOS A8 training – June 2017
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
![Page 34: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/34.jpg)
34
AsyncSubject
Landi APOS A8 training – June 2017
AsyncSubjectExampleActivity.java
![Page 35: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/35.jpg)
35
BehaviorSubject
Landi APOS A8 training – June 2017
BehaviorSubjectExampleActivity.java
![Page 36: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/36.jpg)
36
PublishSubject
Landi APOS A8 training – June 2017
PublishSubjectExampleActivity.java
![Page 37: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/37.jpg)
37
ReplaySubject
Landi APOS A8 training – June 2017
![Page 38: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/38.jpg)
Lambda
Expression
![Page 39: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/39.jpg)
39
Use Retrolambda
Landi APOS A8 training – June 2017
321
No Step 3
Download jdk8 and set it as
your default.
JDK8
Project build.gradle
compileOptions {targetCompatibility 1.8sourceCompatibility 1.8
}
In project build.gradle:
Gradle
In module build.gradle:
buildscript {dependencies {
classpath 'me.tatarka:gradle-retrolambda:3.6.0'
}}
apply plugin: 'com.android.application' apply plugin: 'me.tatarka.retrolambda'
![Page 40: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/40.jpg)
40
Syntax of Lambda Expressions
Landi APOS A8 training – June 2017
(a, b) -> {System.out.println("Performing add operation...");return a+b;
}
1. Input parameter
2. Arrow token
3. Code block
a -> a + 1;
![Page 41: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/41.jpg)
41
Lambda Expressions used in RxJava
Landi APOS A8 training – June 2017
getObservable().map(new Function<List<ApiUser>, List<User>>() {
@Overridepublic List<User> apply(List<ApiUser> apiUsers) throws Exception {
return Utils.convertApiUserListToUserList(apiUsers);}
});
Do an exercise: MapExampleActivity.java
Anonymous function demo:
getObservable().map(apiUsers -> Utils.convertApiUserListToUserList(apiUsers));
Replace with Lambda Expression:
![Page 42: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/42.jpg)
Operator
![Page 43: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/43.jpg)
43
Marble Diagrams
Landi APOS A8 training – June 2017
![Page 44: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/44.jpg)
44
Operators Categories
Landi APOS A8 training – June 2017
CATEGORY
06 A toolbox of useful Operators for
working with Observables.
Utility OperatorsCATEGORY
04 Work with multiple source
Observables to create a single
Observable.
CombiningCATEGORY
05 Help to recover from error
notifications from an Observable.
Error Handling Operators
CATEGORY
03 Selectively emit items from a
source Observable.
FilteringCATEGORY
02 Frequently, your initial font choice
is taken out of your hands
TransformingCATEGORY
01 Originate new Observables
Creating
CATEGORY
09 Specialty Observables that have more
precisely-controlled subscription
dynamics.
ConnectableCATEGORY
07 Evaluate one or more
Observables or items emitted by
Observables.
CATEGORY
08 Operate on the entire sequence
of items emitted by an
Observable
Mathematical and AggregateConditional
ReactiveX - Operators Which Operator do I use?
![Page 45: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/45.jpg)
Filtering Operators
- filter
- distinct
- distinctUtilChanged
![Page 46: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/46.jpg)
46
Filtering
Landi APOS A8 training – June 2017
![Page 47: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/47.jpg)
47
filter example
Landi APOS A8 training – June 2017
Observable.just(1, 2, 3, 4, 5).filter(item -> item < 4)).subscribe(new DisposableObserver<Integer>() {
@Overridepublic void onNext(Integer item) {
System.out.println("Next: " + item);}
@Overridepublic void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());}
@Overridepublic void onComplete() {
System.out.println("Sequence complete.");}
});
![Page 48: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/48.jpg)
48
Dropping Duplicates Using distinct() and distinctUntilChanged()
Landi APOS A8 training – August 2016
Observable.just(1, 2, 1, 1, 2, 3).distinct(); // [1, 2, 3]
Observable.just(1, 2, 1, 1, 2, 3).distinctUntilChanged(); // [1, 2, 1, 2, 3]
![Page 49: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/49.jpg)
Utility Operators
- delay
![Page 50: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/50.jpg)
50
Postponing Events Using the delay()
Landi APOS A8 training – June 2017
just(x, y, z).delay(1, TimeUnit.SECONDS);
![Page 51: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/51.jpg)
Transforming Operators
- map
- flatMap
- concatMap
![Page 52: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/52.jpg)
52
map() Operator
Landi APOS A8 training – June 2017
![Page 53: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/53.jpg)
53
Map example
Landi APOS A8 training – June 2017
MapExampleActivity.java
Observable<Status> tweets = //...Observable<Date> dates = tweets.map(status ->
status.getCreatedAt());
Observable<Instant> instants = tweets.map(Status::getCreatedAt).map((Date d) -> d.toInstant());
![Page 54: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/54.jpg)
54
Wrapping Up Using flatMap()
Landi APOS A8 training – June 2017
![Page 55: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/55.jpg)
55
Flatmap example - Flatten students to course list
Landi APOS A8 training – June 2017
List<Student> students = new ArrayList<Student>();// students.add...
Observable.from(students).flatMap(student -> Observable.from(student.getCoursesList());).subscribe(course -> Log.i(TAG, course.getName()));
![Page 56: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/56.jpg)
56
Flatmap exercise
Landi APOS A8 training – June 2017
Observable<Integer> oneToEight = Observable.range(1, 8);
Observable<String> ranks = oneToEight.map(Object::toString);
Observable<String> files = oneToEight.map(x -> 'a' + x - 1).map(ascii -> (char)ascii.intValue()).map(ch -> Character.toString(ch));
Observable<String> squares = files.flatMap(file -> ranks.map(rank -> file + rank));
![Page 57: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/57.jpg)
57
Order of Events After flatMap()
Landi APOS A8 training – June 2017
just(10L, 1L).flatMap(x ->just(x).delay(x, TimeUnit.SECONDS)).subscribe(System.out::println);
![Page 58: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/58.jpg)
58
Another flatMap exercise
Landi APOS A8 training – June 2017
Observable<String> loadRecordsFor(DayOfWeek dow) {switch(dow) {
case SUNDAY:return Observable
.interval(90, MILLISECONDS)
.take(5)
.map(i -> "Sun-" + i);case MONDAY:
return Observable.interval(65, MILLISECONDS).take(5).map(i -> "Mon-" + i);
//...}
}
Observable.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY).flatMap(this::loadRecordsFor);
![Page 59: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/59.jpg)
59
Preserving Order Using concatMap()
Landi APOS A8 training – June 2017
Observable
.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.concatMap(this::loadRecordsFor);
Sun-0, Sun-1, Sun-2, Sun-3, Sun-4, Mon-0, Mon-1,
Mon-2, Mon-3, Mon-4
Observable
.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.flatMap(this::loadRecordsFor);
Mon-0, Sun-0, Mon-1, Sun-1, Mon-2, Mon-3, Sun-2,
Mon-4, Sun-3, Sun-4
![Page 60: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/60.jpg)
Combining Operators
- zip
- combineLatest
- concat
- merge
![Page 61: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/61.jpg)
61
Pairwise Composing Using zip()
Landi APOS A8 training – June 2017
![Page 62: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/62.jpg)
62
zip() example
Landi APOS A8 training – June 2017
Observable<Long> red = Observable.interval(10, TimeUnit.MILLISECONDS);Observable<Long> green = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable.zip(red.timestamp(),green.timestamp(),(r, g) -> r.getTimestampMillis() - g.getTimestampMillis()
).forEach(System.out::println);
ZipExampleActivity.java
Observable<Long> red = Observable.interval(10, TimeUnit. SECONDS);Observable<Long> green = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable.zip(red.timestamp(),green.timestamp(),(r, g) -> r.getTimestampMillis() - g.getTimestampMillis()
).forEach(System.out::println);
![Page 63: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/63.jpg)
63
When Streams Are Not Synchronized with One Another
Landi APOS A8 training – June 2017
![Page 64: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/64.jpg)
64
combineLatest example
Landi APOS A8 training – June 2017
FormValidationCombineLatestFragment.java
Observable.combineLatest(interval(170, MILLISECONDS).map(x -> "S" + x),interval(100, MILLISECONDS).map(x -> "F" + x),(s, f) -> f + ":" + s
).forEach(System.out::println);
![Page 65: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/65.jpg)
65
Ways of Combining Streams: concat(), merge()
Landi APOS A8 training – June 2017
MergeExampleActivity.java, ConcatExampleActivity.java
![Page 66: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/66.jpg)
66
Merge & Concat Example
Landi APOS A8 training – June 2017
final Observable<String> sundayObservable = Observable.interval(90, MILLISECONDS).take(5).map(i -> "Sun-" + i);
final Observable<String> mObservable = Observable.interval(65, MILLISECONDS).take(5).map(i -> "Mon-" + i);
Observable.merge(sundayObservable, mondayObservable); Observable.concat(sundayObservable, mondayObservable);
Mon-0, Sun-0, Mon-1, Sun-1, Mon-2, Mon-3, Sun-2, Mon-4, Sun-3, Sun-4
Sun-0, Sun-1, Sun-2, Sun-3, Sun-4, Mon-0, Mon-1, Mon-2, Mon-3, Mon-4
![Page 67: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/67.jpg)
Conditional Operators
![Page 68: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/68.jpg)
68
all, exists, contain
Landi APOS A8 training – June 2017
Observable<Integer>numbers = Observable.range(1, 5);
numbers.all(x -> x != 4); // [false]numbers.exists(x -> x == 4); // [true]numbers.contains(4); // [true]
![Page 69: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/69.jpg)
Schedulers
- observeOn
- subscribeOn
![Page 70: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/70.jpg)
70
A simple Observable – A simple data source
Landi APOS A8 training – June 2017
private Observable<String> simple() {return Observable.create(emitter -> {
log("emitting A");emitter.onNext("A");
log("emitting B");emitter.onNext("B");
log("Completing");emitter.onComplete();
});}
![Page 71: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/71.jpg)
71
subscribeOn()
Landi APOS A8 training – June 2017
log("Starting");final Observable<String> obs = simple();log("Created");obs.subscribe(
x -> log("Got " + x),Throwable::printStackTrace,() -> log("Completed")
);
298 | main | Starting
298 | main | Created
298 | main | Emitting A
298 | main | Got A
298 | main | Emitting B
298 | main | Got B
298 | main | Completed
298 | main | Exiting
log("Starting");final Observable<String> obs = simple();log("Created");obs.subscribeOn(Schedulers.io())
.subscribe(x -> log("Got " + x),Throwable::printStackTrace,() -> log("Completed")
);log("Exiting");
298 | main | Starting
298 | main | Created
298 | main | Exiting
301 | RxCachedThreadScheduler | Emitting A
301 | RxCachedThreadScheduler | Got A
301 | RxCachedThreadScheduler | Emitting B
301 | RxCachedThreadScheduler | Got B
301 | RxCachedThreadScheduler | Completed
![Page 72: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/72.jpg)
72
observeOn()
Landi APOS A8 training – June 2017
Controls which Scheduler is used to invoke downstream
log(“Starting”);final Observable<String> obs = simple();log(“Created”);obs.observeOn(Schedulers.io())
.subscribe(x -> log(“Got ” + x),Throwable::printStackTrace,() -> log(“Completed”));
log("Exiting");
298 | main | Starting298 | main | Created298 | main | Emitting A298 | main | Emitting B298 | main | Exiting
301 | RxCachedThreadScheduler | Got A301 | RxCachedThreadScheduler | Got B301 | RxCachedThreadScheduler | Completed
![Page 73: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/73.jpg)
73
Use subcribeOn and observeOn
Landi APOS A8 training – June 2017
log(“Starting”);final Observable<String> obs = simple();log(“Created”);obs.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(x -> log(“Got ” + x),Throwable::printStackTrace,() -> log(“Completed”)
);log("Exiting");
298 | main | Starting298 | main | Created298 | main | Exiting301 | RxCachedThreadScheduler-1 | Emitting A301 | RxCachedThreadScheduler-1 | Emitting B302 | RxCachedThreadScheduler-2 | Got A302 | RxCachedThreadScheduler-2 | Got B302 | RxCachedThreadScheduler-2 | Completed
![Page 74: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/74.jpg)
74
Schedulers
Landi APOS A8 training – June 2017
- Schedulers.newThread()
- Schedulers.io()
- Schedulers.computation()
- Schedulers.trampoline()
- Schedulers.single()
- AndroidSchedulers.mainThread()
![Page 75: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/75.jpg)
Error Handling
//.....subscribe(
System.out::println,throwable -> showError());
![Page 76: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/76.jpg)
76
Replacing errors with a fixed result using onErrorReturn()
Landi APOS A8 training – June 2017
![Page 77: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/77.jpg)
77
onErrorReturn example
Landi APOS A8 training – June 2017
Observable<Income> income = person.flatMap(this::determineIncome).onErrorReturn(error -> Income.no())
//...
private Observable<Income> determineIncome(Person person) {return Observable.error(new RuntimeException(“Foo”));
}
class Income {static Income no() {
return new Income(0);}
}
![Page 78: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/78.jpg)
78
Timing Out When Events Do Not Occur
Landi APOS A8 training – June 2017
![Page 79: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/79.jpg)
79
timeout example
Landi APOS A8 training – June 2017
Observable<String> confirmation() {Observable<String> delayBeforeCompletion =
Observable.<String>empty().delay(200, MILLISECONDS);
return Observable.just("a").delay(100, MILLISECONDS).concatWith(delayBeforeCompletion);
}
confirmation().timeout(110, MILLISECONDS).test().await().assertValue("a").assertError(TimeoutException.class);
![Page 80: Landi Migration Workshop Traini… · One more example to control thread - Traditionally Landi APOS A8 training –June 2017 private void login() {// find user return findUser(name,](https://reader035.vdocuments.us/reader035/viewer/2022062303/5f1ce38755bd33073927130a/html5/thumbnails/80.jpg)
80
Retrying After Failures
Landi APOS A8 training – June 2017
Observable<String> risky() {return Observable.fromCallable(() -> {
if (Math.random() < 0.1) {Thread.sleep((long) (Math.random() * 2000));return "OK";
} else {throw new RuntimeException("Transient");
}});
}
risky().timeout(1, SECONDS).doOnError(th -> log.warn("Will retry", th)).retry().subscribe(log::info);