javantura v3 - going reactive with rxjava – hrvoje crnjak

47
Going Reactive with RxJava JAVANTURA Zagreb – February 2016 Hrvoje Crnjak Java Developer @ Five @HrvojeCrnjak FIV E

Category:

Technology


0 download

TRANSCRIPT

Page 1: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Going Reactive with RxJava

JAVANTURA Zagreb – February 2016

Hrvoje CrnjakJava Developer @ Five@HrvojeCrnjak FIVE

Page 2: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Going Reactive with RxJava

Page 3: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

So what does it mean for the App to be Reactive?

His Majesty

Reactive Manifesto

Page 4: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Reactive App should be …

Message-Driven• Components communicate via asynchronous messages (errors are messages also)

Resilient• System stays responsive in the face of failure

Elastic (Scalable)• System stays responsive under varying workload

Responsive• System responds in timely manner if at all possible

Page 5: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Or if we put it in a diagram …

Resilient

Message-Driven

Scalable

Responsive

Page 6: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

OOP, State of the Union

Resilient

Message-Driven

Scalable

Responsive

We’ll pack Events into Messages The Good

The BadThe Ugly

State and Behavior are Joined

State is Mutated

Error Handling up to the Client

Page 7: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

So how does RxJava fit into all of this?

Page 8: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Event-Driven → Message-Driven

Everything is a message• Including errors

Everyone (each component) can be Producer and Consumer of messages

Producer Consumer

Stream of Messages

UI ComponentRemote ServiceScheduled Job

Whoever’s Subscribed

EventsComputation ResultQuery Result

Page 9: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making Streams of Messages with RxJava

Observable – Representation of the Message ProducerObserver – Representation of the Message Consumer• onNext • onCompleted• onError

onNext

onNext

onNext

onCompleted

Observable Stream of Messages

Page 10: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making Streams of Messages with RxJava

Observable – Representation of the Message ProducerObserver – Representation of the Message Consumer• onNext • onCompleted• onError

onNext

onNext

onError

Observable Stream of Messages

Page 11: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making an Observable

Predefined Observable templates• Observable.from• Observable.just• Factory Methods

• Observable.interval , Observable.range , Observable.empty

Observable<Long> intervalObservable = Observable.interval(500L, TimeUnit.MILLISECODS);

Observable<Integer> rangeObservable = Observable.range(1, 10);

Observable<Character> justObservable = Observable.just('R', 'x', 'J', 'a', 'v', 'a');

List<String> list = Arrays.asList("blue", "red", "green", "yellow", "orange");Observable<String> listObservable = Observable.from(list)

Page 12: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making an Observable

The real Power lies in• Observable.create

public static Observable<SomeDataType> getData(String someParameter) {

return Observable.create(subscriber -> { try { SomeDataType result = SomeService.getData(someParameter); subscriber.onNext(result); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } });}

Page 13: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Consuming an Observable

At it’s Core very simple• observableInstance.subscribe

observableInstance.subscribe(new Observer<SomeDataType>() { @Override public void onNext(SomeDataType message) { // Do something on each Message received }

@Override public void onError(Throwable error) { // Do something on Error }

@Override public void onCompleted() { // Do something when Observable completes }});

Page 14: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Consuming an Observable

At it’s Core very simple• observableInstance.subscribe

observableInstance.subscribe((SomeDataType message) -> {/*onNext*/}, (Throwable error) -> {/*onError*/}, () -> {/*onCompleted*/});

Page 15: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

OOP + RxJava, State of the Union

Resilient

Message-Driven

Scalable

Responsive

Observer + Observable

Page 16: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making the System Scalable

How to approach the problem• Scale up – I don’t think so• Scale out – That’s more like it

• A lot of Cores and Memory!

Desired Characteristics of our System• Program logic should execute in Parallel• Data immutability is Allowed/Encouraged

The answer • Functional programming

Page 17: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Making the System Scalable

Why FP Approach • State Handled Transparently• Highly composable

When we apply this to Rx world …• Data manipulation

• Composable FP style Observable methods• State change

• Each change of state will be a new message in the Stream

Page 18: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Composable methods with RxJava

There are methods for• Content filtering• Time filtering• Data transformation• Stream composition

observableInstance.filter(element -> element < 10)

observableInstance.timeout(100, TimeUnit.MILLISECONDS)

observableInstance.map(number -> number * number)

Observable<String> mergedObservable = Observable.merge(firstObservable, secondObservable, thirdObservable);

Page 19: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Manipulating Streams with RxJava

1Observable.range(1,9) 2 3 4 5 6 7 8 9

5 6 7 8 9

.skipWhile(element -> element < 5)

5 6 7

.take(3)

18

.reduce((elem1, elem2) -> elem1 + elem2)

.subscribe(result -> /*do something*/)

Page 20: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

OOP + RxJava, State of the Union

Observer + Observable

Resilient

Message-Driven

Scalable

Responsive

Observable Methods (FP style)

+ Transparent State

Page 21: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

WHEN the System Fails

With classic OOP the Client has to • try/catch• Resource cleanup

With RxJava the Client has to• onError• Error is a First-class Citizen

onNext

onNext

onError

Observable Stream of Messages

Page 22: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Recovering from Errors

When the Error Occurs• Observable finishes• Observer’s recovery options

• onErrorReturn , onErrorResumeNext, retry

mainObservable.onErrorReturn(throwable -> { System.out.println("The original feed failed with" + throwable); return oneMoreMessage;}).subscribe(data -> {/* doSomething */});

mainObservable.onErrorResumeNext(throwable -> { System.out.println("The original feed failed with" + throwable); return backupObservable;}).subscribe(data -> {/* doSomething */});

Page 23: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

OOP + RxJava, State of the Union

Observer + Observable

Error is a First-Class Citizen

Resilient

Message-Driven

Scalable

Responsive

Observable Methods (FP style)

+ Transparent State

Page 24: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Let’s Get ResponsiVle

Responsive• To Our Client• Already improved Scalability and Resilience• Asynchronous execution

Responsible• To Our System (to our Resources)

Page 25: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

Page 26: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

Page 27: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

Page 28: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

remoteService.getData

Page 29: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

subscriber.onNext(data)

Page 30: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

subscriber.onNext(data)

subscriber.onNext

Page 31: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

Page 32: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

subscriber.onCompleted

Page 33: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

observable.subscribe

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

Page 34: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

main

Main thread

Stack

observable.subscribe(onNext, onCompleted)

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

Page 35: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

“Out of the Box”

Main thread

Stack empty

observable.subscribe(onNext, onCompleted)

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

Page 36: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Let’s get Asynchronous• Thread handling with Observable

• subscribeOn(Scheduler) - Thread Observable will run on• observeOn(Scheduler) - Thread Observer will run on

• Available Schedulers• immediate – use Caller Thread• newThread – do work on new Thread• trampoline – enqueue work on Caller Thread• io – Thread pool used for IO tasks• computation – Thread pool used for Computation tasks

Page 37: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Asynchronous in practice

Main thread

Page 38: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Asynchronous in practice

Main thread

observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(onNext, onCompleted)

Page 39: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Asynchronous in practice

Main thread

observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(onNext, onCompleted)

IO thread

remoteService.getData()

subscriber.onNext(data)

Page 40: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Asynchronous in practice

Main thread

observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(onNext, onCompleted)

IO thread

remoteService.getData()

subscriber.onNext(data)

Computation thread

Page 41: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Asynchronous Streams

Asynchronous in practice

Main thread

observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(onNext, onCompleted)

IO thread

remoteService.getData()

subscriber.onNext(data)

subscriber.onCompleted()

Computation thread

Page 42: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Responsible Client

Being Reactive isn’t just about doing something fast, it’s about not doing it at all.

Or to be more precise, to do only what’s necessary.

Page 43: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Responsible Client

Being Responsible• Observable works only when someone’s listening

• subscribe triggers Observable Stream• Client (Consumer of Stream) tells us when he’s done listening

• unsubscribe

Page 44: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Responsible Client

Two flavors of Unsubscribing• Client (Consumer) is unsubscribed from “outside”

Subscription subscription = observableInstance.subscribe((Long message) -> {/*onNext*/},(Throwable error) -> {/*onError*/}, () -> {/*onCompleted*/});

// Do some logic;subscription.unsubscribe();

Page 45: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Responsible Client

Two flavors of Unsubscribing• Client (Consumer) is unsubscribed from “inside”

observableInstance.subscribe(new Subscriber<Long>() { @Override public void onNext(Long message) { // Do something on each Message received unsubscribe(); }

@Override public void onError(Throwable e) { // Do something on Error }

@Override public void onCompleted() { // Do something when Observable completes }});

Page 46: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

OOP + RxJava, State of the Union

Observer + Observable

Error is a First-Class Citizen

Asynchronous + Resources on Demand

Message-Driven

Scalable

Responsive

ResilientObservable

Methods (FP style)+

Transparent State

Page 47: Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak

Thanks for your time!

Q & hopefully A