java 8 completablefutures imagestreamgang example (part 3)schmidt/cs891f/2018-pdfs/16-java... ·...
TRANSCRIPT
Java 8 CompletableFutures
ImageStreamGang Example (Part 3)
Douglas C. [email protected]
www.dre.vanderbilt.edu/~schmidt
Professor of Computer Science
Institute for Software
Integrated Systems
Vanderbilt University
Nashville, Tennessee, USA
2
Learning Objectives in this Part of the Lesson• Understand the design of the
Java 8 completable futureversion of ImageStreamGang
• Know how to apply completablefutures to ImageStreamGang, e.g.
• Factory methods & completionstage methods
• Arbitrary-arity methods
3
Applying Arbitrary-Arity Methods in
ImageStreamGang
4
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Provides a single means to await completion of a set of futures before continuing with the program
collect() also triggers processing of all the intermediate operations
5
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
6See AndroidGUI/app/src/main/java/livelessons/utils/StreamOfFuturesCollector.java
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Return a future that completes when all futures
in the stream complete
7
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Log the results after the final future completes
8
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Remove empty optional values from the stream in Java 9
See docs.oracle.com/javase/9/docs/api/java/util/Optional.html#flatMap
9
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream -> log(stream
.filter(Optional::isPresent)
.map(Optional::get),
urls.size()))
.join();
Remove empty optional values from the stream in Java 8
10
Applying Arbitrary-Arity Methods in ImageStreamGang• collect() returns a completable
future to a stream of futures to images being asynchronouslydownloaded, filtered, & stored
• These images are displayed after processing completes
• StreamOfFuturesCollector wraps “arbitrary-arity” allOf() method
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Wait until all asynchronous processing is completed
See docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#join
11
Implementing the Class StreamOfFuturesCollector
12See AndroidGUI/app/src/main/java/livelessons/utils/StreamOfFuturesCollector.java
Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()
13StreamOfFuturesCollector is a non-concurrent collector (supports parallel & sequential streams)
Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()
• Converts a stream of completable futures into a single completable future that’s triggered when all futures in the stream complete
14
Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()
• Converts a stream of completable futures into a single completable future that’s triggered when all futures in the stream complete
• Implements the Collector interface thataccumulates input elements into a mutable result container
See docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html
15
Implementing the Class StreamOfFuturesCollector• StreamOfFuturesCollector wraps allOf()
StreamOfFuturesCollector provides a powerful wrapper for some complex code!
16
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
...
• StreamOfFuturesCollector wraps allOf()
Implements a custom collector
See docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html
17
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
...
• StreamOfFuturesCollector wraps allOf()
The type of input elements to the accumulator() method
18
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
...
• StreamOfFuturesCollector wraps allOf()
The mutable accumulation type of the accumulator() method
19
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
...
• StreamOfFuturesCollector wraps allOf()
The result type of the finisher() method, i.e., the final output of the collector
20
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
...
• StreamOfFuturesCollector wraps allOf()
The Stream<T> parameter differs from the List<T> parameter used by the earlier FuturesCollector
See Part 4 of lesson on “Overview of Advanced Java 8 CompletableFuture Features ”
21
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
public Supplier<List<CompletableFuture<T>>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<CompletableFuture<T>>,
CompletableFuture<T>> accumulator()
{ return List::add; }
...
• StreamOfFuturesCollector wraps allOf()
This factory method returns a supplier used by the Java 8 streams collector framework to create a new mutable array list container
22
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
public Supplier<List<CompletableFuture<T>>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<CompletableFuture<T>>,
CompletableFuture<T>> accumulator()
{ return List::add; }
...
• StreamOfFuturesCollector wraps allOf()
This factory method returns a bi-consumer used by the Java 8 streams collector framework to add a new completable future into the mutable array list container
23
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public BinaryOperator<List<CompletableFuture<T>>> combiner() {
return (List<CompletableFuture<T>> one,
List<CompletableFuture<T>> another) -> {
one.addAll(another);
return one;
};
}
...
• StreamOfFuturesCollector wraps allOf()
This factory method returns a binary operator that merges two partial array list results into a single array list (only relevant for parallel streams)
24
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
This factory method returns a function used by the Java 8 streams collector framework to transform the array list accumulation type to the completable future result type
25
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
Convert list of futures to array of futures & pass to allOf() to obtain a future that will complete when all futures complete
26
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
When all futures have completed get a single future to a stream of joined elements of type T
27
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
Convert the array list of futures into a stream of futures
28
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
This call to join() will never block!
29
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher(){
return futures -> CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join));
}
...
• StreamOfFuturesCollector wraps allOf()
Return a future to a stream of elements of T
30
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Set characteristics() {
return Collections.singleton(Characteristics.UNORDERED);
}
public static <T> Collector<CompletableFuture<T>, ?,
CompletableFuture<Stream<T>>>
toFuture() {
return new StreamOfFuturesCollector<>();
}
}
• StreamOfFuturesCollector wraps allOf()
Returns a set indicating the characteristics of the StreamOfFutureCollector class
StreamOfFuturesCollector is thus a non-concurrent collector
31
Implementing the Class StreamOfFuturesCollector
public class StreamOfFuturesCollector<T>
...
public Set characteristics() {
return Collections.singleton(Characteristics.UNORDERED);
}
public static <T> Collector<CompletableFuture<T>, ?,
CompletableFuture<Stream<T>>>
toFuture() {
return new StreamOfFuturesCollector<>();
}
}
• StreamOfFuturesCollector wraps allOf()
This static factory method creates a new StreamOfFuturesCollector
32
Implementing the Class StreamOfFuturesCollector• toFuture() returns a future to
a stream of futures to images that are being downloaded, filtered, & stored
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Provides a single means to await completion of a set of futures before continuing with the program
33
End of Java 8 CompletableFutures ImageStreamGang
Example (Part 3)