java 8 parallel stream internals (part 5)schmidt/cs891f/2018-pdfs/...4 •after the common fork-join...
TRANSCRIPT
Java 8 Parallel Stream Internals
(Part 5)
Douglas C. [email protected]
www.dre.vanderbilt.edu/~schmidt
Professor of Computer Science
Institute for Software
Integrated Systems
Vanderbilt University
Nashville, Tennessee, USA
2
• Understand parallel stream internals, e.g.
• Know what can change & what can’t
• Partition a data source into “chunks”
• Process chunks in parallel
• Configure the Java 8 parallel stream common fork-join pool
• Avoid pool starvation & improve performance w/ManagedBlocker
• Perform a reduction that combines partial results into a single result
Learning Objectives in this Part of the Lesson
join join
join
Processsequentially
Processsequentially
Processsequentially
Processsequentially
InputString1.1 InputString1.2 InputString2.1 InputString2.2
InputString1 InputString2
trySplit()
InputString
trySplit() trySplit()
See www.ibm.com/developerworks/library/j-java-streams-3-brian-goetz
3
Combining Results in a Parallel Stream
4
• After the common fork-join pool finishesprocessing chunks their partial resultsare combined into a final result
Combining Results in a Parallel Stream
join join
join
Processsequentially
Processsequentially
Processsequentially
Processsequentially
DataSource1.1 DataSource1.2 DataSource2.1 DataSource2.2
DataSource1 DataSource2
DataSource
Partial results
Final result
This discussion assumes a non-concurrent collector (more discussions follow)
5
• After the common fork-join pool finishesprocessing chunks their partial resultsare combined into a final result
• join() occurs in a single thread at each level
• i.e., the “parent”
Combining Results in a Parallel Stream
join join
join
Processsequentially
Processsequentially
Processsequentially
Processsequentially
DataSource1.1 DataSource1.2 DataSource2.1 DataSource2.2
DataSource1 DataSource2
DataSource
“Parent”
“Children”
6
• After the common fork-join pool finishesprocessing chunks their partial resultsare combined into a final result
• join() occurs in a single thread at each level
• i.e., the “parent”
Combining Results in a Parallel Stream
join join
join
Processsequentially
Processsequentially
Processsequentially
Processsequentially
DataSource1.1 DataSource1.2 DataSource2.1 DataSource2.2
DataSource1 DataSource2
DataSource
As a result, there’s typically no need for synchronizers during the joining
“Parent”
“Children”
7
• Different terminal operations combine partial results in different ways
Combining Results in a Parallel Stream
Understanding these differences is particularly important for parallel streams
8
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
Combining Results in a Parallel Stream
See docs.oracle.com/javase/tutorial/essential/concurrency/immutable.html
9
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
Combining Results in a Parallel Stream
See github.com/douglascraigschmidt/LiveLessons/tree/master/Java8/ex16
Processsequentially
Processsequentially
Processsequentially
Processsequentially
longs 1..2 longs 3..4 longs 5..6 longs 7..8
longs 1..4 longs 5..8
Range of longs from 1..8
long factorial(long n) {
return LongStream
.rangeClosed(1, n)
.parallel()
.reduce(1, (a, b) -> a * b,
(a, b) -> a * b);
}
2 12 30 56
10
long factorial(long n) {
return LongStream
.rangeClosed(1, n)
.parallel()
.reduce(1, (a, b) -> a * b,
(a, b) -> a * b);
}
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
Combining Results in a Parallel Stream
reduce() combines two immutable values (e.g., long or Long) & produces a new one
reduce()
reduce()
Processsequentially
Processsequentially
Processsequentially
Processsequentially
longs 1..2 longs 3..4 longs 5..6 longs 7..8
longs 1..4 longs 5..8
Range of longs from 1..8
2 12 30 56
24
40,320
reduce() 1,680
11See greenteapress.com/thinkapjava/html/thinkjava011.html
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
• collect() mutates an existing value
Combining Results in a Parallel Stream
12
Set<CharSequence>
uniqueWords =
getInput(sSHAKESPEARE),
"\\s+")
.parallelStream()
...
.collect(toCollection(TreeSet::new));
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
• collect() mutates an existing value
Combining Results in a Parallel Stream
Processsequentially
Processsequentially
Processsequentially
Processsequentially
1st quarter of words 2nd quarter of words 3rd quarter of words 4th quarter of words
1st half of words 2nd half of words
All words in Shakespeare’s works
See github.com/douglascraigschmidt/LiveLessons/tree/master/Java8/ex14
13
Set<CharSequence>
uniqueWords =
getInput(sSHAKESPEARE),
"\\s+")
.parallelStream()
...
.collect(toCollection(TreeSet::new));
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
• collect() mutates an existing value
Combining Results in a Parallel Stream
collect() collect()
collect()
Processsequentially
Processsequentially
Processsequentially
Processsequentially
collect() mutates a container to accumulate the result it’s producing
1st quarter of words 2nd quarter of words 3rd quarter of words 4th quarter of words
1st half of words 2nd half of words
All words in Shakespeare’s works
14
Set<CharSequence>
uniqueWords =
getInput(sSHAKESPEARE),
"\\s+")
.parallelStream()
...
.collect(ConcurrentHashSetCollector.toSet());
• Different terminal operations combine partial results in different ways, e.g.
• reduce() creates a new immutable value
• collect() mutates an existing value
Combining Results in a Parallel Stream
Processsequentially
Processsequentially
Processsequentially
Processsequentially
Concurrent collectors (covered later) are different than non-concurrent collectors
1st quarter of words 2nd quarter of words 3rd quarter of words 4th quarter of words
1st half of words 2nd half of words
All words in Shakespeare’s works
accumulate() accumulate()
accumulate()
ConcurrentResult Container
15
• More discussion about reduce() vs. collect() appears online
Combining Results in a Parallel Stream
See www.youtube.com/watch?v=oWlWEKNM5Aw
16
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
See github.com/douglascraigschmidt/LiveLessons/tree/master/Java8/ex17
17
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
Convert a list of words into a stream of works
Naturally, this call doesn’t really do any work since streams are “lazy”
18
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
A stream can be dynamically switched to “parallel” mode!
19
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
See mail.openjdk.java.net/pipermail/lambda-libs-spec-experts/2013-March/001504.html
The “last” call to .parallel() or .sequential() in a stream “wins”
20
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
This code works when parallel is false since the StringBuilder is only called in a single thread
See docs.oracle.com/javase/8/docs/api/java/lang/StringBuilder.html
21
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
This code fails when parallel is true since reduce() expects to do
an “immutable” reduction
22
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new StringBuilder(),
StringBuilder::append,
StringBuilder::append)
.toString();
There are race conditions here since there’s just one shared StringBuilder, which is not properly thread-safe..
See www.baeldung.com/java-string-builder-string-buffer
23
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid buggyStreamReduce3
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.reduce(new String(),
(x , y) -> x + y);
This simple fix is inefficient due to string concatenation overhead
See javarevisited.blogspot.com/2015/01/3-examples-to-concatenate-string-in-java.html
24
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
Combining Results in a Parallel Streamvoid streamCollectJoining
(boolean parallel) {
...
Stream<String> wordStream =
allWords.stream();
if (parallel)
wordStream.parallel();
String words = wordStream
.collect(joining());
This is a much better solution!!
See www.mkyong.com/java8/java-8-stringjoiner-example
25
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
• Beware of issues related toassociativity & identity
Combining Results in a Parallel Streamvoid testDifferenceReduce(...) {
long difference = LongStream
.rangeClosed(1, 100)
.parallel()
.reduce(0L,
(x, y) -> x - y);
}
void testSum(long identity, ...) {
long sum = LongStream
.rangeClosed(1, 100)
.reduce(identity,
// Could use (x, y) -> x + y
Math::addExact);
See github.com/douglascraigschmidt/LiveLessons/tree/master/Java8/ex17
26
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
• Beware of issues related toassociativity & identity
Combining Results in a Parallel Streamvoid testDifferenceReduce(...) {
long difference = LongStream
.rangeClosed(1, 100)
.parallel()
.reduce(0L,
(x, y) -> x - y);
}
void testSum(long identity, ...) {
long sum = LongStream
.rangeClosed(1, 100)
.reduce(identity,
// Could use (x, y) -> x + y
Math::addExact);
This code fails for a parallel stream since subtraction
is not associative
See www.ibm.com/developerworks/library/j-java-streams-2-brian-goetz
27
• More discussion about reduce() vs. collect() appears online, e.g.
• Always test w/a parallel streamto detect mistakes wrt mutablevs. immutable reductions
• Beware of issues related toassociativity & identity
Combining Results in a Parallel Streamvoid testDifferenceReduce(...) {
long difference = LongStream
.rangeClosed(1, 100)
.parallel()
.reduce(0L,
(x, y) -> x - y);
}
void testSum(long identity, ...) {
long sum = LongStream
.rangeClosed(1, 100)
.reduce(identity,
// Could use (x, y) -> x + y
Math::addExact);
This code fails if identity is not 0L
The “identity” of an OP is defined as “identity OP value == value” (& inverse)
28
End of Java 8 Parallel Stream Internals (Part 5)