java 8 completablefutures imagestreamgang example (part 3)schmidt/cs891f/2018-pdfs/16-java... ·...

33
Java 8 CompletableFutures ImageStreamGang Example (Part 3) Douglas C. Schmidt [email protected] www.dre.vanderbilt.edu/~schmidt Professor of Computer Science Institute for Software Integrated Systems Vanderbilt University Nashville, Tennessee, USA

Upload: others

Post on 23-Mar-2020

6 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

Java 8 CompletableFutures

ImageStreamGang Example (Part 3)

Douglas C. [email protected]

www.dre.vanderbilt.edu/~schmidt

Professor of Computer Science

Institute for Software

Integrated Systems

Vanderbilt University

Nashville, Tennessee, USA

Page 2: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

2

Learning Objectives in this Part of the Lesson• Understand the design of the

Java 8 completable futureversion of ImageStreamGang

• Know how to apply completablefutures to ImageStreamGang, e.g.

• Factory methods & completionstage methods

• Arbitrary-arity methods

Page 3: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

3

Applying Arbitrary-Arity Methods in

ImageStreamGang

Page 4: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

4

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Provides a single means to await completion of a set of futures before continuing with the program

collect() also triggers processing of all the intermediate operations

Page 5: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

5

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Page 6: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

6See AndroidGUI/app/src/main/java/livelessons/utils/StreamOfFuturesCollector.java

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Return a future that completes when all futures

in the stream complete

Page 7: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

7

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Log the results after the final future completes

Page 8: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

8

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Remove empty optional values from the stream in Java 9

See docs.oracle.com/javase/9/docs/api/java/util/Optional.html#flatMap

Page 9: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

9

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream -> log(stream

.filter(Optional::isPresent)

.map(Optional::get),

urls.size()))

.join();

Remove empty optional values from the stream in Java 8

Page 10: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

10

Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable

future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored

• These images are displayed after processing completes

• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Wait until all asynchronous processing is completed

See docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#join

Page 11: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

11

Implementing the Class StreamOfFuturesCollector

Page 12: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

12See AndroidGUI/app/src/main/java/livelessons/utils/StreamOfFuturesCollector.java

Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()

Page 13: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

13StreamOfFuturesCollector is a non-concurrent collector (supports parallel & sequential streams)

Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()

• Converts a stream of completable futures into a single completable future that’s triggered when all futures in the stream complete

Page 14: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

14

Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()

• Converts a stream of completable futures into a single completable future that’s triggered when all futures in the stream complete

• Implements the Collector interface thataccumulates input elements into a mutable result container

See docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html

Page 15: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

15

Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()

StreamOfFuturesCollector provides a powerful wrapper for some complex code!

Page 16: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

16

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

...

• StreamOfFuturesCollector wraps allOf()

Implements a custom collector

See docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html

Page 17: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

17

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

...

• StreamOfFuturesCollector wraps allOf()

The type of input elements to the accumulator() method

Page 18: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

18

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

...

• StreamOfFuturesCollector wraps allOf()

The mutable accumulation type of the accumulator() method

Page 19: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

19

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

...

• StreamOfFuturesCollector wraps allOf()

The result type of the finisher() method, i.e., the final output of the collector

Page 20: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

20

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

...

• StreamOfFuturesCollector wraps allOf()

The Stream<T> parameter differs from the List<T> parameter used by the earlier FuturesCollector

See Part 4 of lesson on “Overview of Advanced Java 8 CompletableFuture Features ”

Page 21: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

21

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

public Supplier<List<CompletableFuture<T>>> supplier() {

return ArrayList::new;

}

public BiConsumer<List<CompletableFuture<T>>,

CompletableFuture<T>> accumulator()

{ return List::add; }

...

• StreamOfFuturesCollector wraps allOf()

This factory method returns a supplier used by the Java 8 streams collector framework to create a new mutable array list container

Page 22: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

22

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

implements Collector<CompletableFuture<T>,

List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> {

public Supplier<List<CompletableFuture<T>>> supplier() {

return ArrayList::new;

}

public BiConsumer<List<CompletableFuture<T>>,

CompletableFuture<T>> accumulator()

{ return List::add; }

...

• StreamOfFuturesCollector wraps allOf()

This factory method returns a bi-consumer used by the Java 8 streams collector framework to add a new completable future into the mutable array list container

Page 23: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

23

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public BinaryOperator<List<CompletableFuture<T>>> combiner() {

return (List<CompletableFuture<T>> one,

List<CompletableFuture<T>> another) -> {

one.addAll(another);

return one;

};

}

...

• StreamOfFuturesCollector wraps allOf()

This factory method returns a binary operator that merges two partial array list results into a single array list (only relevant for parallel streams)

Page 24: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

24

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

This factory method returns a function used by the Java 8 streams collector framework to transform the array list accumulation type to the completable future result type

Page 25: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

25

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

Convert list of futures to array of futures & pass to allOf() to obtain a future that will complete when all futures complete

Page 26: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

26

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

When all futures have completed get a single future to a stream of joined elements of type T

Page 27: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

27

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

Convert the array list of futures into a stream of futures

Page 28: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

28

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

This call to join() will never block!

Page 29: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

29

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Function<List<CompletableFuture<T>>,

CompletableFuture<Stream<T>>> finisher(){

return futures -> CompletableFuture

.allOf(futures.toArray(new CompletableFuture[0]))

.thenApply(v -> futures.stream()

.map(CompletableFuture::join));

}

...

• StreamOfFuturesCollector wraps allOf()

Return a future to a stream of elements of T

Page 30: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

30

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Set characteristics() {

return Collections.singleton(Characteristics.UNORDERED);

}

public static <T> Collector<CompletableFuture<T>, ?,

CompletableFuture<Stream<T>>>

toFuture() {

return new StreamOfFuturesCollector<>();

}

}

• StreamOfFuturesCollector wraps allOf()

Returns a set indicating the characteristics of the StreamOfFutureCollector class

StreamOfFuturesCollector is thus a non-concurrent collector

Page 31: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

31

Implementing the Class StreamOfFuturesCollector

public class StreamOfFuturesCollector<T>

...

public Set characteristics() {

return Collections.singleton(Characteristics.UNORDERED);

}

public static <T> Collector<CompletableFuture<T>, ?,

CompletableFuture<Stream<T>>>

toFuture() {

return new StreamOfFuturesCollector<>();

}

}

• StreamOfFuturesCollector wraps allOf()

This static factory method creates a new StreamOfFuturesCollector

Page 32: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

32

Implementing the Class StreamOfFuturesCollector• toFuture() returns a future to

a stream of futures to images that are being downloaded, filtered, & stored

void processStream() {

List<URL> urls = getInput();

CompletableFuture<Stream<Image>>

resultsFuture = urls

.stream()

.map(this::checkUrlCachedAsync)

.map(this::downloadImageAsync)

.flatMap(this::applyFiltersAsync)

.collect(toFuture())

.thenApply(stream ->

log(stream.flatMap

(Optional::stream),

urls.size()))

.join();

Provides a single means to await completion of a set of futures before continuing with the program

Page 33: Java 8 CompletableFutures ImageStreamGang Example (Part 3)schmidt/cs891f/2018-PDFs/16-Java... · 2018-11-17 · set of futures before continuing with the program collect() also triggers

33

End of Java 8 CompletableFutures ImageStreamGang

Example (Part 3)