java 8 completablefutures imagestreamgang example (part 2)schmidt/cs891f/2018-pdfs/...asynchronously...
TRANSCRIPT
![Page 1: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/1.jpg)
Java 8 CompletableFutures
ImageStreamGang Example (Part 2)
Douglas C. [email protected]
www.dre.vanderbilt.edu/~schmidt
Professor of Computer Science
Institute for Software
Integrated Systems
Vanderbilt University
Nashville, Tennessee, USA
![Page 2: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/2.jpg)
2
Learning Objectives in this Part of the Lesson• Understand the design of the
Java 8 completable futureversion of ImageStreamGang
• Know how to apply completablefutures to ImageStreamGang
SocketSocket
List of URLs to Download
…
List of Filters to Apply
Persistent
Data Store
![Page 3: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/3.jpg)
3
Learning Objectives in this Part of the Lesson• Understand the design of the
Java 8 completable futureversion of ImageStreamGang
• Know how to apply completablefutures to ImageStreamGang, e.g.
• Factory methods & completionstage methods
![Page 4: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/4.jpg)
4
Applying CompletableFutures to ImageStreamGang
![Page 5: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/5.jpg)
5
• Focus on processStream()
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
...See imagestreamgang/streams/ImageStreamCompletableFuture1.java
![Page 6: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/6.jpg)
6
• Focus on processStream()
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Get the list of URLs input by the user
![Page 7: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/7.jpg)
7
• Focus on processStream()
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Combines a Java 8 sequential stream with
completable futures
![Page 8: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/8.jpg)
8
• Focus on processStream()
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Factory method creates a stream of URLs
![Page 9: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/9.jpg)
9
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Asynchronously check if images are already cached locally
map() converts a stream of URLs to a stream of futures to optional URLs
![Page 10: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/10.jpg)
10
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Asynchronously download an image at each given URL
map() converts URL futures (completed) to image futures (downloading)
![Page 11: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/11.jpg)
11
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Asynchronously filter & store downloaded images on the local file system
flatMap() converts image futures (completed) to filtered image futures (xforming/storing)
![Page 12: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/12.jpg)
12
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Create a future used to wait for all async operations associated w/the
stream of futures to complete
See next part on “arbitrary-arity” methods in CompletableFuture
![Page 13: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/13.jpg)
13
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
This lambda logs the results when all the futures in stream
complete their async processing
![Page 14: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/14.jpg)
14
• Focus on processStream()
• This implementation is verydifferent from parallel streams
Applying Completable Futures to ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Wait until all the images have been downloaded,
processed, & stored
This is the one & only call to join() in this implementation strategy
![Page 15: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/15.jpg)
15
Applying Factory Methods in ImageStreamGang
![Page 16: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/16.jpg)
16
• Initiate an async check to see if images are cached locally
Applying Factory Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
map() calls the behavior checkUrlCachedAsync()
![Page 17: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/17.jpg)
17
• Initiate an async check to see if images are cached locally
Applying Factory Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Asynchronously check if a URL is already downloaded
![Page 18: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/18.jpg)
18
• Initiate an async check to see if images are cached locally
Applying Factory Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Returns a stream of completablefutures to optional URLs, which have a value if the URL is not
cached or are empty if it is cached
Later behaviors simply ignore “empty” optional URL values
![Page 19: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/19.jpg)
19
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
See imagestreamgang/streams/ImageStreamCompletableFutureBase.java
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
![Page 20: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/20.jpg)
20
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
This factory method registers an action that runs asynchronously
See docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#supplyAsync
![Page 21: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/21.jpg)
21
void initiateStream() {
// Set the executor to the common fork-join pool.
setExecutor(ForkJoinPool.commonPool());
...
}
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
supplyAsync() runs action in a worker thread from the common fork-join pool
See dzone.com/articles/common-fork-join-pool-and-streams
![Page 22: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/22.jpg)
22
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
ofNullable() is a factory method that returns a completable future to an optional URL, which has a value if the URL is not cached or is empty if it is cached
See docs.oracle.com/javase/8/docs/api/java/util/Optional.html#ofNullable
![Page 23: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/23.jpg)
23
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
See imagestreamgang/streams/ImageStreamGang.java
boolean urlCached(URL url) {
return mFilters.stream()
.filter(filter -> urlCached(url, filter.getName()))
.count() > 0;
}
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
Returns true if the image has already been filtered before
![Page 24: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/24.jpg)
24
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
See imagestreamgang/streams/ImageStreamGang.java
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
boolean urlCached(URL url, String filterName) {
File file = new File(getPath(), filterName);
File imageFile = new File(file, getNameForUrl(url));
return !imageFile.createNewFile();
}
Returns true if image file already exists
![Page 25: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/25.jpg)
25
• checkUrlCachedAsync() uses the supplyAsync() factory method internally
Applying Factory Methods in ImageStreamGang
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
.supplyAsync(() ->
Optional.ofNullable(urlCached(url)
? null
: url),
getExecutor());
}
There are clearly better ways of implementing an image cache!
boolean urlCached(URL url, String filterName) {
File file = new File(getPath(), filterName);
File imageFile = new File(file, getNameForUrl(url));
return !imageFile.createNewFile();
}
![Page 26: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/26.jpg)
26
Applying Completion Stage Methods in ImageStreamGang
![Page 27: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/27.jpg)
27
• Asynchronously download an image at each given URL
Applying Completion Stage Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
map() calls the behaviordownloadImageAsync()
![Page 28: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/28.jpg)
28
• Asynchronously download an image at each given URL
Applying Completion Stage Methods in ImageStreamGang
Asynchronously downloads an image & stores it in memory
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Later behaviors simply ignore “empty” optional images
![Page 29: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/29.jpg)
29
• Asynchronously download an image at each given URL
Applying Completion Stage Methods in ImageStreamGang
Returns a stream of futures to optional images, which have a value if the image is being downloaded or
are empty if it is already cached
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
![Page 30: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/30.jpg)
30
• downloadImageAsync() uses the thenApplyAsync() method internally
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
Asynchronously download image when future completes
Applying Completion Stage Methods in ImageStreamGang
See imagestreamgang/streams/ImageStreamCompletableFuture1.java
![Page 31: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/31.jpg)
31
Applying Completion Stage Methods in ImageStreamGang• downloadImageAsync() uses the thenApplyAsync() method internally
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
This completion stage method registers an action that’s not executed immediately, but only after the future completes
See docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#thenApplyAsync
![Page 32: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/32.jpg)
32
Applying Completion Stage Methods in ImageStreamGang• downloadImageAsync() uses the thenApplyAsync() method internally
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
If a url is present when the future completes download it & return an optional describing the result; otherwise return an empty optional
See docs.oracle.com/javase/8/docs/api/java/util/Optional.html#map
![Page 33: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/33.jpg)
33
Asynchronously run blockingDownload() if the url is non-empty
Applying Completion Stage Methods in ImageStreamGang• downloadImageAsync() uses the thenApplyAsync() method internally
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
![Page 34: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/34.jpg)
34You could also use a ThreadPoolExecutor (fixed-size or cached)
Applying Completion Stage Methods in ImageStreamGang
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
• downloadImageAsync() uses the thenApplyAsync() method internally
Use the common fork-join pool & its ManagedBlocker mechanism
![Page 35: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/35.jpg)
35
Applying Completion Stage Methods in ImageStreamGang
Image blockingDownload(URL url) {
return BlockingTask
.callInManagedBlock(() ->
downloadImage(url));
}
• downloadImageAsync() uses the thenApplyAsync() method internally
See imagestreamgang/streams/ImageStreamGang.java
Transform a URL into an Image by downloading each image via the URL
![Page 36: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/36.jpg)
36
Applying Completion Stage Methods in ImageStreamGang
Image blockingDownload(URL url) {
return BlockingTask
.callInManagedBlock(() ->
downloadImage(url));
}
• downloadImageAsync() uses the thenApplyAsync() method internally
BlockingTask.callInManagedBlock() wraps the ManagedBlocker interface, which expands the common fork/join thread pool to handle the blocking image download
See imagestreamgang/utils/BlockingTask.java
![Page 37: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/37.jpg)
37
Returns a future to an image that completes when the image is finished downloading
Applying Completion Stage Methods in ImageStreamGang
CompletableFuture<Optional<Image>> downloadImageAsync
(CompletableFuture<Optional<URL>> urlFuture) {
return urlFuture
.thenApplyAsync(urlOpt ->
urlOpt
.map(this::blockingDownload),
getExecutor());
}
• downloadImageAsync() uses the thenApplyAsync() method internally
![Page 38: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/38.jpg)
38
• Asynchronously filter & store downloaded images on the local file system
Applying Completion Stage Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
flatMap() calls behavior applyFiltersAsync()
![Page 39: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/39.jpg)
39
• Asynchronously filter & store downloaded images on the local file system
Applying Completion Stage Methods in ImageStreamGang
Asynchronous filter images & store them into files
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Later operations ignore “empty” optional images
![Page 40: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/40.jpg)
40
• Asynchronously filter & store downloaded images on the local file system
Applying Completion Stage Methods in ImageStreamGang
“Flatten” all filtered/stored images into a single output stream
void processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
See docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#flatMap
![Page 41: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/41.jpg)
41
• Asynchronously filter & store downloaded images on the local file system
Applying Completion Stage Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
Returns a stream of futures to optional images, which have a
value if the image is being filtered or are empty if it is already cached
![Page 42: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/42.jpg)
42
• applyFiltersAsync() uses the thenApplyAsync() method internally
See imagestreamgang/streams/ImageStreamCompletableFuture1.java
Applying Completion Stage Methods in ImageStreamGang
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
Asynchronously filter images & then store them into files
![Page 43: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/43.jpg)
43
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
Convert the list of filters into a stream
![Page 44: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/44.jpg)
44
Applying Completion Stage Methods in ImageStreamGang
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
• applyFiltersAsync() uses the thenApplyAsync() method internally
Asynchronously apply a filter action after the previous stage completes
See docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#thenApplyAsync
![Page 45: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/45.jpg)
45
Applying Completion Stage Methods in ImageStreamGang
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
• applyFiltersAsync() uses the thenApplyAsync() method internally
This completion stage method registers an action that’s not executed immediately, but runs only after the future completes
![Page 46: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/46.jpg)
46
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));}
If an image is present then perform the action & return optional containing result;
otherwise return an empty optional
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
See docs.oracle.com/javase/8/docs/api/java/util/Optional.html#map
![Page 47: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/47.jpg)
47
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));}
If an image is non-null then asynchronously filter the image & store it in an output file
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
![Page 48: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/48.jpg)
48
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));}
thenApplyAsync() runs the actions in a thread from the common fork-join pool
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
![Page 49: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/49.jpg)
49
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
It also returns a new completable future that will trigger when the image has been filtered/stored
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
![Page 50: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/50.jpg)
50
Applying Completion Stage Methods in ImageStreamGang• applyFiltersAsync() uses the thenApplyAsync() method internally
Stream<CompletableFuture<Optional<Image>>> applyFiltersAsync
(CompletableFuture<Optional<Image>> imageFuture) {
return mFilters
.stream()
.map(filter -> imageFuture
.thenApplyAsync(imageOpt ->
imageOpt
.map(image ->
makeFilterDecoratorWithImage
(filter, image).run()),
getExecutor()));
}
applyFiltersAsync() returns a stream of completable
futures to optional images
![Page 51: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/51.jpg)
51
• Asynchronously filter & store downloaded images on the local file system
Applying Completion Stage Methods in ImageStreamGangvoid processStream() {
List<URL> urls = getInput();
CompletableFuture<Stream<Image>>
resultsFuture = urls
.stream()
.map(this::checkUrlCachedAsync)
.map(this::downloadImageAsync)
.flatMap(this::applyFiltersAsync)
.collect(toFuture())
.thenApply(stream ->
log(stream.flatMap
(Optional::stream),
urls.size()))
.join();
flatMap() merges the stream of futures returned by applyFilters
Async() into a single stream
This stream is processed by collect(), as discussed in the next part of the lesson
![Page 52: Java 8 CompletableFutures ImageStreamGang Example (Part 2)schmidt/cs891f/2018-PDFs/...Asynchronously filter & store downloaded images on the local file system flatMap() converts image](https://reader034.vdocuments.us/reader034/viewer/2022050115/5f4bef36d092ee6d5b5d3ec0/html5/thumbnails/52.jpg)
52
End of Java 8 CompletableFutures ImageStreamGang
Example (Part 2)