reactive programming with rxjava
TRANSCRIPT
Christophe Marchal
Reactive programming with RxJava
http://github.com/toff63
@toff63
http://francesbagual.net
About me
MOTIVATION
Scaling
Moore’s law is delivering more cores but not faster cores
Amdahl’s law
Speedup limited by the sequential portion of the code
In other words, parallelize your
code to scale
I am not Netflix!
http://techblog.netflix.com/2016/09/zuul-2-netflix-journey-to-asynchronous.html
Multithreaded System Architecture
Efficient use of resources
● CPU efficient
● Memory efficient
● Hard Drive efficient
● Network efficient
Efficient use of threads
Non-Blocking architecture
http://techblog.netflix.com/2016/09/zuul-2-netflix-journey-to-asynchronous.html
Asynchronous and Non-Blocking System Architecture
Challenges
● listeners / callback● force functional code● exception handling● Everything becomes a
Stream
RxJava for the win!
Challenges
Erik Meijer
Origins
Reactive eXtension
Collection Future
Current vision
Observable: Stream of event
Reactive vision
Everything is an event
Observable: Stream of event
Observer
Reactive vision
Reactive vision
T getData()
One Item
synchronous
Reactive vision
Iterable<T> getData()T getData()
One Item Several Items
synchronous
Reactive vision
Iterable<T> getData()T getData()
Future<T> getData()
One Item Several Items
synchronous
Asynchronous
Reactive vision
Iterable<T> getData()
Observable<T> getData()
T getData()
Future<T> getData()
One Item Several Items
synchronous
Asynchronous
Pull vs Push
Iterable<T> getData() Observable<T> getData()
Pull
T next()throw Exception()
returns;
Push
onNext()onError()onComplete()
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/basic/Application.java
Simple collection
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/restclient/Application.java
Calling remote API
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/restclient/Application.java
Calling remote API
Observable from Future
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/restclient/Application.java
Calling remote API
Exception handling
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/restclient/Application.java
Calling remote API
Fallback
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/Service.java
Composing Observable
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/Application.java#L17
Composing Observable
https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png
FlatMap
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/Application.java#L34
Composing Observables
https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png
Zip
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/Application.java#L24
Composing Observables
https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png
Merge
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/SeriesHttpHandler.java#L22
RxNetty
Demo
Concurrency
Observable is sequential
Scheduling and Combining Observables enable Concurrency
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/composition/SeriesHttpHandler.java
Adding concurrency
Scheduler
Demo
Cold Stream vs Hot Stream
Hot Stream Cold Stream
no control on emission rate emits when requested
UI events, Metric events,
System events
DB query, Service request,
Downloading file
Cold Stream vs Hot Stream
Hot Stream Cold Stream
no control on emission rate emits when requested
UI events, Metric events,
System events
DB query, Service request,
Downloading file
Flow control Flow control & Back pressure
Flow Control
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
Backpressure needed
Synchronous on same thread
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
No backpressure needed
Asynchronous (queuing)
Block Operator
Hot Stream Cold Stream
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
toBlocking
Temporal Operators
Hot Stream
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
Sample
9891040771910879031787798
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
Throttle First
0118117118630717530662584575
Debounce
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
Debounce
52025
Buffer
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
Buffer
[0, 1, 2, 3, 4, 5][6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20][21, 22, 23, 24, 25]
Window time
https://github.com/toff63/Sandbox/blob/master/java/rsjug-rx/rsjug-rx/src/main/java/rs/jug/rx/flowcontrol/Backpressure.java
28545333541675424446235
Window time
Reactive Stream
● Push when Consumer keeps up
● Pull when Consumer is slow
● Bound all queues
Maximize throughput
Reactive Stream: Consumer keeps up
Publisher Subscriber
As many as you can
Push
Reactive Stream: Consumer Starts buffering
Publisher Subscriber
As many as you can
Reactive Stream: Consumer Starts buffering
Publisher Subscriber
Give me 0
Give me 2
Pull
On backpressure buffer
On backpressure buffer Hot Stream
Hot Stream
On backpressure buffer
Scheduler
Hot Stream
On backpressure Drop
On backpressure buffer Hot Stream
And lots and lots of other operators
Rx Ports
● Observable API is complex
● Takes time to become fluent with Observable
● Hard to test !
● Debugging is harder as everything is asynchronous
● Stacktraces can be truncated due to scheduler
Drawbacks
Stacktrace example
18:42:59.487 [rx-request-processor-5-67] ERROR n.k.t.util.HttpContentInputStream - Error on serverio.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1178) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:848) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] at netflix.karyon.transport.util.HttpContentInputStream$1.onNext(HttpContentInputStream.java:67) [karyon2-governator-2.3.0.jar:2.3.0] at netflix.karyon.transport.util.HttpContentInputStream$1.onNext(HttpContentInputStream.java:33) [karyon2-governator-2.3.0.jar:2.3.0] at rx.Observable$33.onNext(Observable.java:7480) [rxjava-1.0.10.jar:1.0.10] at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130) [rxjava-1.0.10.jar:1.0.10] at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onNext(UnicastContentSubject.java:262) [rxnetty-0.4.9.jar:0.4.9] at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) [rxjava-1.0.10.jar:1.0.10] at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:151) [rxjava-1.0.10.jar:1.0.10] at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:184) [rxjava-1.0.10.jar:1.0.10] at io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:286) [rxnetty-0.4.9.jar:0.4.9] at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:193) [rxnetty-0.4.9.jar:0.4.9] at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:129) [rxnetty-0.4.9.jar:0.4.9] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) [netty-transport-4.0.27.Final.jar:4.0.27.Final] at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:32) [netty-transport-4.0.27.Final.jar:4.0.27.Final] at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:329) [netty-transport-4.0.27.Final.jar:4.0.27.Final] at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36) [netty-common-4.0.27.Final.jar:4.0.27.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [netty-common-4.0.27.Final.jar:4.0.27.Final] at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) [netty-common-4.0.27.Final.jar:4.0.27.Final] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Netflix experience migrating zuul
http://techblog.netflix.com/2016/09/zuul-2-netflix-journey-to-asynchronous.html
References
● https://www.infoq.com/presentations/rx-service-architecture
● https://www.infoq.com/presentations/rxjava-reactor● http://reactivex.io/tutorials.html● http://reactivex.io/documentation/operators.html
Christophe Marchal
Thank you !
Questions?