java programming in reactivebangalorejug.org/wp-content/uploads/2017/03/reactive-programmin… ·...

Post on 22-May-2020

1 Views

Category:

Documents

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Reactive Programming in JavaCopyright - Syncogni Consulting Pvt Ltd. All rights reserved.

Prerequisites:

● Core Java ● Lambda Expressions ● Method references ● Functional Programming ● Web - application development using

JEE/Spring

● You are free to interrupt me at any time with your questions

● I may○ answer it right away, if it is simple○ answer in the end if

■ it takes a lot of time to question and answer■ it has many buts and ifs■ it is out of the scope of our learning plan

Rules of the game

● We may not discuss any particular project related issues here, because :

○ It needs more time to study and understand the project specific environment

○ The resolving may take a lot of iterations / testing / time

○ It may not benefit everyone○ It may waste others’ time

Rules of the game

Expectations from the participants?

● Some slides are a bit descriptive, because:○ anybody can use it even after the session without

trainer○ it serves as PPT + study material○ participants read it silently/aloud during session

to maximize participation

Note:

Teaser begins….

● Responsive● Resilient● Elastic● Message Driven

Official Reactive Manifesto

Get the official ribbon!!!

Teaser Ends….

Trailer begins….

● Reactive = showing response to a stimulus● Reactive / Responsive● Scalable● Resilient / fault tolerant● Event driven● Observable sequences● Backpressure● Represented using marble diagrams

Reactive Programming

Reactive Programming

● Async facade - design pattern● Asynchronous

○ Ex 1: Do x, y, z after p is done○ Ex 2: Do p, q after r is started○ Ex 3:

Reactive Programming

● Polyglot● Implemented (Library is available) in

○ .NET○ Scala○ Clojure○ JavaScript○ Ruby○ Python○ C++, Groovy, Objective-C/Cocoa etc.

Reactive Programming

● Java Implementations:○ RxJava○ Akka-Streams○ Project Reactor(http://projectreactor.io/)○ Spring 5○ JSR 166 (headed by Doug Lee) to be implemented

in Java 9 as Flow API○ http://www.reactive-streams.org/○ Vert.x 3.0

RxJava

● Originally from Netflix● Currently open source● Works with Java 6+● Official Page:(http://reactivex.io/)● First implementation of reactive style of

programming● Has very powerful APIs

What is RxJava?

Note : RxJava = Reactive Extensions for Java

“A library for composing asynchronous and event based programs using observable sequences for the Java VM”

- Netflix

● We are not blocked on some operation● We will respond to an event as and when we

receive the notification of that event● When / if the stock price changes, we will be

notified. Then only we will take appropriate action

● If the cricket score changes, we will hear a loud noise, then only look at the screen :)

What is Reactive Programming?

● We don’t wait for the entire web page to be downloaded to display it

● Keep pushing the election results as and when the change happens

● Divide and conquer approach● Keep pushing the information like blackberry● Users will react to the pushed data

What is Reactive Programming?

● Get inspired by:○ Gang of Four’s Observer Pattern○ Iterable type’s these two extra semantics:

■ The ability to signal that there is no more data available

■ The ability to signal that an error has occurred● Mix them well with seasoning :)

Recipe for RxJava

Functional Reactive

Asynchronous

Values

Events

Push

Functional Reactive

Lambdas

Closures

Pure

Composable

Imperative vs Reactive

Iterable Observable

pull push

T next() onNext(T)

throws Exception onError(Exception)

returns; onCompleted()

Imperative vs Reactive

// Iterable<String> contains 379 strings

myIterable.skip( 16 ) //header.map( s -> s+”_transformed”)//blocking.forEach( e -> System.out.println(e) );

// Observable<String> contains 379 strings

myObservable.skip(16) //header.map( s -> s+" transformed" )//non-blocking.subscribe( e -> System.out.println(e) );

Scenarios:

Single Multiple

Sync T getData() Iterable<T> getData()

Async Future<T> getData() Observable<T> getData()

Trailer Ends!!!

Movie Begins...

Hello World RxJava

SubscriberSubscriber<String> mySubscriber = new Subscriber<String>() {

@Override

public void onNext(String s) {

System.out.println(s);

}

@Override

public void onCompleted() {

System.out.println("Done");

}

@Override

public void onError(Throwable e) {

e.printStackTrace();

}

};

ObservableObservable<String> myObservable = Observable.create(

new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> sub) {

sub.onNext("Hello, world!");

sub.onCompleted();

}

});

Connecting codemyObservable.subscribe(mySubscriber);

HelloWorldRxJava - Demo

Communication Protocol

● One item will be pushed to client during each ‘onNext’ call at the server side

● For every ‘onNext’ call on server, client can execute ‘onNext’ and receive an item

● Once server executes, ‘onCompleted’ or ‘onError’, it will not execute ‘onNext’ again

Communication Protocol

● Client will not initiate ‘onNext’ or ‘onCompleted’ or ‘onError’, but, it is the server

● ‘onError’ signifies that there is some error at the observable’s side

● The observable will call neither ‘onCompleted’ nor ‘onNext’ after calling ‘onError’

Communication Protocol

● onNext* (onCompleted | onError)? * = zero or more; ? = zero or one● Client may call ‘unsubscribe’ method at any

time, to signal the same to server● Server must check this using

‘subscriber.isUnsubscribed()’ method and must stop sending further push notifications

Life Cycle of an Observable

● lazily starts emitting items only after subscription

● keeps emitting items till an error is encountered or till the end of stream is reached

What is marble diagram?

Ex 2 : Client tells the server to stop//client side code

Subscriber<String> mySubscriber = new Subscriber<String>() {

@Override

public void onNext(String s) {

System.out.println(s);

if (s.toUpperCase().contains("BAHU")) {

System.out.println("I got my movie!!!!");

unsubscribe();

}

}

};

Ex 2 : Client tells the server to stop//Server side

Observable<String> myObservable = Observable.create(new

Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> sub) {

List<String> list = Arrays.asList("Dhoom 3", "Bahubali",

"Star wars", "Batman");

for (String e : list) {

if (sub.isUnsubscribed()) {

break;

}

sub.onNext(e);

}

sub.onCompleted();

}

});

Ex 3 : Error Handling - Server codetry {

for (String e : list) {

if (sub.isUnsubscribed()) {

break;

}

if(e.equalsIgnoreCase("Bahubali 10")){

throw new Exception("Something is wrong!!!");

}

sub.onNext(e);

}

sub.onCompleted();

} catch (Exception e1) {

sub.onError(e1); break;

}

Ex 3 : Error Handling - Client code@Override

public void onError(Throwable e) {

e.printStackTrace();

}

ManualErrorHandler - Demo

Ex 5: Take only some values - demo//I can process only next 2 orders

Observable<String> myCustomObservable = myObservable.take(2);

myCustomObservable.subscribe(mySubscriber);

Ex 6: Take orders only for some time//I can take orders for another 3 seconds

Observable<String> myCustomObservable = myObservable.take(3, TimeUnit.SECONDS);

myCustomObservable.subscribe(mySubscriber);

Ex 7: Skip some initial values//Skip initial 3 bytes which is a useless header

Observable<String> myCustomObservable =

myObservable.skip(3);

myCustomObservable.subscribe(mySubscriber);

Ex 8: Skip values for some time//Skip the values for initial 3 seconds

//while they are testing microphone

Observable<String> myCustomObservable =

myObservable.skip(3, TimeUnit.SECONDS);

myCustomObservable.subscribe(mySubscriber);

Ex 9: Skip and Take - Composition

myObservable.skip(1)

.take(2)

.subscribe(mySubscriber);

Ex 10: Filter the streammyObservable.skip(1)

.take(2)

.filter(e -> e.toUpperCase().contains("STAR"))

.subscribe(mySubscriber);

SkipTakeFilterDemo

Ex 11: Two Subscribers - DemomyObservable.skip(1)

.take(2)

.filter(e -> e.toUpperCase().contains("STAR"))

.subscribe(mySubscriber1);

myObservable.skip(2)

.take(3)

.filter(e -> e.toUpperCase().contains("GOLD"))

.subscribe(mySubscriber2);

Ex 12: Combinators//Combinators - Combine / Merge 2 observables

Observable.merge(myObservable1, myObservable2)

.subscribe(mySubscriber);

Merge2Observables - Demo

Error Handling

Ex 13: onErrorResumeNext//on error, resume with the next observable

myObservable1.onErrorResumeNext(myObservable2)

.subscribe(mySubscriber);

Ex 13: onErrorResumeNext

Map vs FlatMap

● Both do transformation● Map:

○ maps one item to another item○ returns only one item○ synchronous

● FlatMap:○ maps one item to zero or more items○ returns an Observable stream○ asynchronous

FlatMapmyObservable1.flatMap( s ->

{

String movie1 = s + " 1";

String movie2 = s + " 2";

String movie3 = s + " 3";

return Observable.just(movie1, movie2, movie3);

}

).subscribe(mySubscriber);

Transformations

Group By Transformation

● groupBy○ similar to ‘group by’ clause in SQL○ groups items according to the condition given○ Ex : can group a sequence of numbers based on

odd or even

Climax

Case studies:

Case study 1: DB Access

Case study 2 : FlatMap

Service Orchestration

Service Orchestration

Case study 3 : Car assembly line

Clients

REST Controller REST Controller REST Controller

Service Service Service Service

DB External API External API

Clients

REST Controller REST Controller REST Controller

Service Service Service Service

External API External API

REST Controller REST Controller REST Controller

Service Service Service Service

External API External API

Orchestrated Service Orchestrated Service

Clients

Controllers

Service Orchestration Layer

Standard Services Layer

External APIs / Data Access Layer / Databases

Case study : Spring - DeferredResult

Case study 5 : Spring - SseEmitter

THE END

Now revisit the teaser & the trailer :)

References

● Google● Wikipedia● SO● RxJava API Docs● http://reactivex.io/documentation/operators.h

tml

?

Thank you!!!

Let us stay in touch :Dinesh BhattDB@syncogni.comwww.syncogni.comIndia : +91-888-415-9093USA : 001-281-303-5266https://in.linkedin.com/in/dineshbhatt

top related