reactive programming on android
TRANSCRIPT
Reactive programming on AndroidTomáš Kypta
reactive programming or functional reactive programming (FRP)
What’s reactive programming?
functional– lambdas– closures– pure– composable
What’s reactive programming?
reactive– data flow– asynchronous– values– events– push
• observer pattern
Reactive?
Typical AppEvent Source
Views Network DB Other
Listener Listener Listener Listener
logic logiclogiclogic
State
Transformation
ReactiveEvent Source
Observable Observable Observable Observable
Other
ObserverObserver
Views Network DB
• abstract away from concerns about– low-level threading– side effects– synchronization– encapsulation– resource management
Reactive
Java 8
Observable.toObservable(“a”, “b”, “c”) .take(2) .subscribe((arg) -> {
System.out.println(arg); });
Reactive Code
• evading callback hell• How to execute heavy tasks on
background threads?• And deliver results on the main (UI)
thread?
Reactive on Android
Async on Android
Handler handler = new Handler();
new Thread(){
@Override
public void run() {
final String result = somethingDemanding();
handler.post(new Runnable() {
@Override
public void run() {
showResult(result);
}
});
}
}.start();
Thread + Handler
Handler handler = new Handler();
new Thread(){
@Override
public void run() {
final String result = somethingDemanding();
handler.post(new Runnable() {
@Override
public void run() {
showResult(result);
}
});
}
}.start();
Thread + Handler
• simple
• difficult to deliver on the main thread
• broken data flow
Thread + Handler
new AsyncTask<Void, Integer, String>(){
@Override
protected String doInBackground(Void... params) {
return somethingDemanding();
}
@Override
protected void onPostExecute(String s) {
showResult(s);
}
}.execute();
AsyncTask
• deals with the main thread• error-prone• difficult error propagation• difficult to bound to activity/fragment
lifecycle• difficult composition
AsyncTask
class MyFragment extends Fragment implements LoaderManager.LoaderCallbacks<String> {
@Override public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
getLoaderManager().initLoader(42, null, this);
}
@Override public Loader<String> onCreateLoader(int id, Bundle args) {
return new AsyncTaskLoader<String>(getActivity()) {
@Override public String loadInBackground() {
return doHeavyTask();
}
};
}
@Override public void onLoadFinished(Loader<String> loader, String data) {
showResult(data);
}
@Override public void onLoaderReset(Loader<String> loader) {}
}
Loaders
Loaders
• boilerplate • good for cursors• deals with activity/fragment lifecycle• difficult composition• difficult to adjust logic
Loaders
• many other async libraries– some are better– some are worse
Async on Android
RxJava
• Java VM implementation of Reactive Extensions
• a library for composing flows and sequences of asynchronous data
• open-source• https://github.com/ReactiveX/RxJava
• by Netflix
RxJava
• DSL for creating computation flows from async sources
• flows called Observables• push semantics of Observables
RxJava
RxJava … not sure if …
proxy or wrapper around an object that is not yet there
future.get() future.isDone()
• non-trivial complexity when nested• difficult to compose conditional
asynchronous execution flows
Future
Iterable vs. Observable
// Iterable<String>
getData()
.skip(10)
.take(5)
.map({s ->
return s + “1”})
.forEach({
println it
});
Iterable vs. Observable
// Observable<String>
getData()
.skip(10)
.take(5)
.map({s ->
return s + “1”})
.subscribe({
println it
});
single items multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()
Iterable vs. Observable
event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete !hasNext() onCompleted()
Iterable vs. Observable
public Observable<String> getData();
synchronous on calling thread
Observable
Calling Thread
Callback Thread
public Observable<String> getData();
asynchronous on a separate thread
Observable
Calling Thread
Callback ThreadThread Pool
public Observable<String> getData();
asynchronous on multiple threads
Observable
Calling Thread
Callback Threads
Thread Pool
• not biased toward some particular source of asynchronicity
• the implementation chooses if the code will be blocking or non-blocking
Observable
public interface Observer <T> { void onCompleted(); void onError(Throwable throwable); void onNext(T t); }
Observable
public Observable<String> getStrings() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
Observable
Observable<String> strings = getStrings();
strings.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "no more data");
}
@Override
public void onError(Throwable throwable) {
Log.e(“rx”, “error”, throwable);
}
@Override
public void onNext(String s) {
showResult(s);
}
});
Observer
Observable<String> strings = getStrings();
strings.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "no more data");
}
@Override
public void onError(Throwable throwable) {
Log.e(“rx”, “error”, throwable);
}
@Override
public void onNext(String s) {
showResult(s);
}
});
Observer
Observable<String> strings = getStrings();
strings.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "no more data");
}
@Override
public void onError(Throwable throwable) {
Log.e(“rx”, “error”, throwable);
}
@Override
public void onNext(String s) {
showResult(s);
}
});
Observer
Observable<String> strings = getStrings();
strings.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "no more data");
}
@Override
public void onError(Throwable throwable) {
Log.e(“rx”, “error”, throwable);
}
@Override
public void onNext(String s) {
showResult(s);
}
});
Observer
Observable<String> strings = getStrings();
Subscription subsctiption = strings.subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "no more data");
}
@Override
public void onError(Throwable throwable) {
Log.e(“rx”, “error”, throwable);
}
@Override
public void onNext(String s) {
showResult(s);
}
});
Observer
Subscription subsctiption;
subscription.unsubscribe();
subscription.isUnsubscribed();
Subscription
Android & RxJava
Android specific things for Rx• Scheduler • reliable and thread-safe with regarding
Fragment/Activity life-cycle• reactive components for Android use cases
and UI
RxAndroid
• https://github.com/ReactiveX/RxAndroid• 'io.reactivex:rxandroid:0.23.0'• formerly rxjava-android
RxAndroid
Observable<Bitmap> imageObservable =
Observable.create(observer -> {
return downloadBitmap();
});
imageObservable
.subscribe(image -> loadToImageView(image));
Image Loading
Observable<Bitmap> imageObservable =
Observable.create(observer -> {
return downloadBitmap();
});
imageObservable
.subscribe(image -> loadToImageView(image));
Image Loading
imageObservable
.subscribeOn(Schedulers.newThread())
.subscribe(image -> loadToImageView(image));
Image Loading
imageObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(image -> loadToImageView(image));
Image Loading
imageObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(image -> loadToImageView(image));
Image Loading
public void onDestroy() {
super.onDestroy();
subscription.unsubscribe();
}
Lifecycle & Memory Leaks
ContentObservable
.bindFragment(fragment, observable);
ContentObservable
.bindActivity(activity, observable);
Fragment/Activity Lifecycle
Observable<Intent> observable = ContentObservable .fromBroadcast(context, intentFilter);
Observable<Intent> observable = ContentObservable .fromLocalBroadcast(context, intentFilter);
Broadcast Receiver
Observable<OnClickEvent> observable = ViewObservable.clicks(view);
ViewObservable .bindView(view, observable);
Views
Creating Observables
• manually from scratch create()
Creating Observables
• convert existing data structure into an Observablefrom()
– Iterable, Future, Arrayrepeat()
– emit source Observable repeatedly
Creating Observables
• convert existing data structure into an Observabletimer()
– emits a single item after a given time
Creating Observables
• convert existing data structure into an Observableempty()
– emits nothing and completeserror()
– emits nothing and signals an errornever()
– emits nothing at all
Creating Observables
transform - map, flatMap, reduce, scanfilter - take, skip, sample, filtercombine - concat, merge, zip, cacheconcurrency - observeOn, subcribeOnerror handling - onErrorReturn
Obervables are composable
Transforming Observables
map()
– apply a function to each item
Transforming Observables
flatMap()
– transform items into Observables and flatten
Transforming Observables
scan()
– apply a function to each item sequentially and emit each successive value
Transforming Observables
groupBy()
– divide into a set of Observables
Transforming Observables
buffer()
– periodically gather items into bundles and emits these bundles
Transforming Observables
Filtering Observables
filter()
– filter emitted items
Filtering Observable
take()
– emit only first n items
Filtering Observable
timeout()
– emit items, but issue an exception if nothing emitted in a timespan
Filtering Observable
distinct()
– suppress duplicate items
Filtering Observable
takeLastBuffer() first() elementAt()
– emit only n-th elementignoreElements()
– pass only error or completed
Filtering Observable
Combining Observables
merge()
– combine multiple Observables
Combining Observables
zip()
– combine multiple Observables via a function, emit results of the function
Combining Observables
Error Handling
recovering from onError() onErrorResumeNext()
– emit a sequence if error happens
Error Handling
onErrorReturn()
– emit an item if error happens
Error Handling
retry()
– resubscribe to a source
Error Handling
Support for RxJava in other libraries
@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);
Retrofit
private Object simpleMethod() { ... }
public Observable<Object> newMethod() {
return Observable.just(simpleMethod());
}
Plain old library
private Object simpleMethod() { ... }
public Observable<Object> newMethod() {
return Observable.defer(() ->
Observable.just(simpleMethod())
);
}
Plain old library
No lambdas on Android– use different language on Android– or rather not
Java 8 & Android
transforms Java 8 compiled bytecode for older JVM
• gradle-retrolambda– https://github.com/evant/gradle-retrolambda– 'me.tatarka:gradle-retrolambda:2.4.1'
• retrolambda– https://github.com/orfjackal/retrolambda
Retrolambda
strings.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.length();
}
});
Without Retrolambda
strings.map(
(String s) -> { return s.length(); }
);
With Retrolambda
• RxAndroid APIs not yet stabilized even though RxJava hitting version 1.0– RxAndroid version 0.23
• backpressure
Problems
Questions?
THE END