concurrentprogramming in javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfjava concurrency...
TRANSCRIPT
![Page 2: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/2.jpg)
Concurrency in Java
• Java supports concurrency at the language level– Not through external libraries as in the case of
C/C++• E.g., pthread
• Classes to instantiate and run new threads
• Methods to synchronize threads
![Page 3: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/3.jpg)
The Thread class
public class MyThreadextends Thread {
private String message;
public MyThread(String m) {message = m;
}
public void run() { for(int r=0; r<20; r++) {
System.out.println(message);}
}}
public class ProvaThread {public static void main(String[] args) {
MyThread t1,t2;t1=new MyThread(“Thread1");t2=new MyThread(“Thread2");t1.start();t2.start();
}}
![Page 4: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/4.jpg)
The Runnable interface
public class MyThreadimplements Runnable {
private String message;
public MyThread(String m) {message = m;
}
@Overridepublic void run() {
for(int r=0; r<20; r++) {System.out.println(message);
}}
}
public class ProvaThread {public static void main(String[] args) {
MyThread r1,r2;r1=new MyThread(“Thread1");r2=new MyThread(“Thread2");Thread t1 = new Thread(r1);Thread t2 = new Thread(r2);t1.start();t2.start();
}}
![Page 5: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/5.jpg)
Exercise
• Implement a program to compute in parallel the sum of two matrices
• Each thread is responsible for one line of the result matrix
• You can use the join primitve to wait for a thread to finished processing
![Page 6: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/6.jpg)
Non determinism• The order of instructions in multiple threads is not
deterministic
• Multiple executions of the same code on the same or different computers might produce different results– It depends on the runtime scheduling, on the concurrent
execution of other processes, on the operating system, …
• Non-determinism is a key aspect in concurrent programming …
• … which makes concurrent programming particularly difficult
![Page 7: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/7.jpg)
Java concurrency model• Java adopts a preemptive model
• Java ensures that threads with equal priority execute in round-robin fashion
• Time-slicing and scheduling depend on the JVM internals
• Threads may explicitly relinquish control to the JVM scheduler through the yield() method
![Page 8: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/8.jpg)
Synchronization• Synchronizing means introducing constraints on the
order of execution of instructions from different threads
• Example: the invocation of join() in the previous example prevents the main threads from printing the result matrix before all execution threads complete
• Other example: mutual exclusion– At most one thread can access a given code region
![Page 9: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/9.jpg)
Synchronization• Synchronization can be detrimental for performance
• Indeed, synchronization prevents threads from running in parallel– Might also lead to performance that are worse than a
single-thread implementation …– … due to the overhead to start and schedule new threads
at runtime
• We will discuss some patterns to minimize synchronization
![Page 10: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/10.jpg)
Mutabilitypublic class ImmutableClass {
final int var;public ImmutableClass(int var) {
this.var = var;}public int getVar() {
return var;}
}
public class MutableClass {int var;public MutableClass(int var) {
this.var = var;}public int getVar() {
return var;}public void incVar() {
var++;}
}
![Page 11: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/11.jpg)
ImmutabilityBenefits
• Multiple threads can access an immutable objects in parallel
• The object will not be modified– No write/write or
read/write conflicts– The results of the
computation will not be influenced by the order of accesses to the object
Limitations
• Whenever we want to “modify” an immutable object we need to instantiate a new object– Can be expensive in terms
of memory occupancy and execution time
![Page 12: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/12.jpg)
Synchronization
• Java offers the synchronized keyword to discipline concurrent accesses to mutable objects–Write/write conflicts– Read/write conflicts
![Page 13: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/13.jpg)
Synchronization• Invocations of
synchronized methods from different threads are executed sequentially one after the other
public class MutableClass {int var;public ImmutableClass(int var) {
this.var = var;}public int getVar() {
return var;}public synchronized void incVar() {
var++;}
}
![Page 14: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/14.jpg)
Synchronization• Invocations of
synchronized methods from different threads are executed sequentially one after the other
• Also in the case of multiple synchronized methods
public class MutableClass {int var;public ImmutableClass(int var) {
this.var = var;}public int getVar() {
return var;}public synchronized void incVar() {
var++;}public synchronized void decVar() {
var--;}
}
![Page 15: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/15.jpg)
Objects and locks• In practice, before accessing a synchronized method on
object o, a thread needs to acquire an exclusive lock on o
• Each object owns a lock– Inherited from the Object class
• Primitive data types do not have associated locks– To control the access to primitive instances we need to
synchronize on other objects
• In the case of arrays and collections, acquiring the lock on the “container” does not imply any acquisition of locks on the contained objects
![Page 16: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/16.jpg)
Synchronization
• Java offers synchronized blocks to for finer-grained synchronization
synchronized(obj) { // Acquire lock on obj…
} // Release lock on obj
![Page 17: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/17.jpg)
Synchronizationsynchronized void f() {
…}
• Equivalent to
void f() {synchronized(this) {
…}
}
![Page 18: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/18.jpg)
Synchronization and atomicity• A synchronized method/block is executed in mutual
exclusion with respect to other synchronized methods/blocks– There can still be interleaved execution of other non-
synchronized methods/blocks– There can still be interleaved execution of
methods/blocks synchronized on other objects/locks
• Synchronized is not equivalent to atomic– Although synchronization can be used to achieve
atomicity
![Page 19: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/19.jpg)
Typical rules1. Always lock the updates to fields
sychronized(point) {point.x = …;point.y = …;
}
2. Always lock the access to mutable fieldssynchronized(point) {
if (point.x > 0) { … }}
![Page 20: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/20.jpg)
Typical rules3. No need to synchronize stateless parts of methods
public synchronized void f() {state = …;operation();
}
public void f() {synchronized(this) {
state = …;}operation();
}
NO!
OK
![Page 21: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/21.jpg)
Typical rules4. Avoid locking when invoking methods on other objects
public synchronized void f() {…;h.foo();
}
public void f() {synchronized(this) {
…;}h.foo();
}
NO!
OK
![Page 22: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/22.jpg)
Deadlock
• What’s the reason for rule 4?
• Deadlock!There is a deadlock when a group of threads is blocked because each thread is waiting to acquire a lock currently held by another thread in the group
![Page 23: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/23.jpg)
Deadlock: exampleclass Cell {
private long value;synchronized long getValue() {
return value;}synchronized void setValue(long value) {
this.value = value;}synchronized void swap(Cell other) {
long t = getValue();long v = other.getValue();setValue(v);other.setValue(t);
}}
• If two instances of Cell (instance a and instance b) invoke the method swapValue() concurrently, the program might block indefinitely because a wants to acquire the lock of b and viceversa
![Page 24: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/24.jpg)
Assignments and lock• Assignments are atomic operations
– Except in the case of assignments of long and double
• In general, it is not necessary to synchronize read and write accesses to a single variable
• However, threads might hold the value of such variable in a local memory (cache)– If a thread changes the value of a variable, another thread might still
see the previous value (violation of sequential consistency)– A possible solution consists in making the variable volatile, which
ensures that each access to the main memory goes through the main memory
![Page 25: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/25.jpg)
Assignments and locks
Thread 1int x = 0;int y = 0;…………x = 1;y = 1;
Thread 2………………………read y = 1;read x: which values are allowed?
![Page 26: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/26.jpg)
Adapters
• A common programming pattern consists in delegating the synchronization of non-synchronized objects to synchronized adapters
![Page 27: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/27.jpg)
Adapters
class SynchedPoint {protected final BarePoint delegate = new BarePoint();public synchronized double getX() { return delegate.x; }public synchronized double getY() { return delegate.y; }public synchronized void setX(double v) { delegate.x = v; }public synchronized void setY(double v) { delegate.y = v; }
}
![Page 28: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/28.jpg)
Synchronization and Collections• Java Collections adopt a synchronization strategy
based on Adapters
• The main classes are not synchronized
• The Collections class offers static methods that return anonymous Adapters
List<String> unsyncList = new ArrayList<>();List<String> syncList = Collections.synchronizedList(unsyncList);
![Page 29: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/29.jpg)
Exercise
• Given an integer array of size N, create a new array with all and only the numbers that are multiple of 3
• Run the computation in parallel on 4 threads (assume that N is multiple of 4)– How to select the size of the result array?– How to synchronize?
![Page 30: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/30.jpg)
Exercise
• First solution– The result array has size N
• Waste of memory
–We keep a variable with the index of the first free position in the result array• Need to synchronize each and every access to such
variable!• Expensive!!!
![Page 31: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/31.jpg)
Exercise
• Second solution– Perform the computation twice
• First, to count the number of results produced in each thread
• Then, to write the actual results in the destination array
–Writing results does not require synchronization• Each thread knows how many results will be produced …• … thanks to the first counting computation
![Page 32: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/32.jpg)
Prefix sum
• The second solution represents a common pattern in parallel programming
• Useful when the number of results produced by each thread is not known upfront
• Often referred to as prefix sum pattern– A prefix sum computation is needed to determine
where each node has to write
![Page 33: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/33.jpg)
Prefix sumBenefits
• No need to synchronize when producing the results
• Allocation of the minimum amount of memory needed to store the results
Limitations
• Perform the computation twice
• Only works with a static allocation of tasks to threads– It must be the same
in the two phases
![Page 34: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/34.jpg)
Condition synchronization
• Assume we need to manage a buffer of limited size where different threads add and remove elements:– It is not possible to remove elements when the buffer
is empty– It is not possible to add elements when the buffer is
full– If a thread wants to perform an operation, it needs to
wait until the above conditions are met
![Page 35: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/35.jpg)
Condition synchronization• For these scenarios, Java offers condition synchronization
– In practice, a waiting queue for each object, with the following methods
• public final void notify()– Wakes up a single thread among those that are waiting in the queue of the
object
• public final void notifyAll()– Wakes up all the threads that are waiting in the queue of the object
• public final void wait() throws InterruptedException– The thread waits to be notified by some other thread– The thread releases the synchronization lock associated to the object on
which wait() is invoked– When the thread is waken up, it needs to acquire the lock again before
restarting the execution
![Page 36: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/36.jpg)
Condition synchronization• A thread can wake up from a wait() even if the condition is not
met– Spurious wake up
• Thus, it is necessary to check again the condition after waking up
public synchronized void act() throws InterruptedException {while (! cond) {
wait();}// Operations on the state of the object …notifyAll();
}
![Page 37: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/37.jpg)
Exercise
• Implement the limited-size buffer discussed in the previous slides
![Page 38: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/38.jpg)
Condition synchronization
• notify() or notifyAll()?– notify() reduces the context-switch overhead by
waking up a single thread– Can be used when at most one thread needs to be
waken up, which happens when:• All the threads are waiting on conditions that refer to the
same event (often, the same condition)– E.g., single producer, multiple consumers
• Each notification allows at most one thread to continue (no need to wake up more than one thread)– E.g., single producer or single consumer
![Page 39: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/39.jpg)
High-level constructs
![Page 40: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/40.jpg)
Executors• In the previous examples, we have seen the cost of
manually building new threads, in terms of lines of lines of code and program complexity
• Executors are a higher-level alternative to manual management of threads
• Executors run asynchronous tasks by managing a thread “pool”– No need to manually create new threads– Enable reuse of threads from the pool
• Better performance
![Page 41: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/41.jpg)
ExecutorsExecutorService executor =
Executors.newFixedThreadPool(4);
executor.submit(() -> {String name = Thread.currentThread().getName();System.out.println(“Thread: ” + threadName);
});
![Page 42: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/42.jpg)
Executors• The Executors class offers different static
methods to build various types of executors
– newFixedThreadPool(int nThreads)
– newCachedThreadPool()• Reuses existing threads to execute new threads
– newScheduledThreadPool(int poolSize)• For periodic executions
![Page 43: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/43.jpg)
Executors
• The ExecutorService class exposes methods to verify/wait for task completion
– isTerminated()
– awaitTermination(long timeout, TimeUnit unit)
![Page 44: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/44.jpg)
Executors
• Executors need to be explicitly terminated– Otherwise they wait for new tasks indefinitely
• The ExecutorService class offers two methods for termination– shutdown() waits for all the currently executing tasks
to terminate– shutdownNow() immediately interrupts all executing
tasks
![Page 45: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/45.jpg)
Exercise
• Change the array filtering code to use an Executor
![Page 46: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/46.jpg)
Callable e Future
• Beside Runnables, executors also support a different kind of task: Callable
• Callable<T> is a functional interface like Runnable, but it defines a method that returns a value of type T (instead of void)
![Page 47: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/47.jpg)
Callable e Future• A Callable<T> can be submitted to an executor
service like a Runnable
• The executor service returns a Future<T>– Future<T> will contain the return value of the
Callable, when the callable is executed– Future<T> offers the following methods
• isDone() to check if the computation is complete• get() to access the return value at the end of the
computation– The method blocks the caller until the computation is complete
![Page 48: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/48.jpg)
Callable e FutureExecutorService executor = Executors.newFixedThreadPool(1); Future<Integer> f = executor.submit(task); System.out.println("Done? " + f.isDone()); // Probably not …
Integer res = f.get(); // Waits until the end of the computationSystem.out.println("Done? " + future.isDone()); // Certainly yes
System.out.print("Result: " + res);
![Page 49: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/49.jpg)
Callable and Future
• An executor service also provides methods to invoke multiple Callable<T>– invokeAll(List<Callable<T>>)
• Returns a list of Future<T> associated to the input Callable<T> list
– invokeAny(List<Callable<T>>)• Returns a single result, as computed by the first
Callable<T> that terminates among those in the input list
![Page 50: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/50.jpg)
Callable e FutureList<Callable<String>> callables =
Arrays.asList(() -> "task1",() -> "task2",() -> "task3”
);
executor.invokeAll(callables).stream().map(future -> {
try { return future.get();
} catch (Exception e) {throw new IllegalStateException(e);
}}).forEach(System.out::println);
![Page 51: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/51.jpg)
Callable e Future
List<Callable<String>> callables = Arrays.asList(
() -> "task1",() -> "task2",() -> "task3”
);
String res = executor.invokeAny(callables);System.out.println(res);
![Page 52: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/52.jpg)
Exercise
• Define an execution pipeline
• Each step can be performed using 4 different algorithms– Start the algorithms in parallel and wait until one of
them terminates– Use the result to compute the next step
![Page 53: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/53.jpg)
Locks• Locks offer an alternative to the implicit locking of
synchronized methods/blocks
• Benefits– Better control of the locking policies– Example: in the case of multiple concurrent read and rare
writes, we can use a ReadWriteLock that enables two types of locking• Non-exclusive: read lock
– Several threads can obtain a non-exclusive lock simultaneously• Exclusive: write lock
– A single thread can obtain an exclusive lock when no other thread holds a lock (both exclusive and non exclusive)
![Page 54: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/54.jpg)
LocksExecutorService executor = Executors.newFixedThreadPool(2);Map<String, Integer> contacts = new HashMap<>();ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {lock.writeLock().lock(); contacts.put(“Ale”, 349 1234567);lock.writeLock().unlock();
});
![Page 55: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/55.jpg)
Locks
executor.submit(() -> {lock.readLock().lock(); System.out.println(contacts.get(“Ale”));lock.readLock().unlock();
});
![Page 56: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/56.jpg)
Exercise• Define a class with a list field– 1 thread adds elements to the list– 3 thread read elements from the list
• Two alternative implementations– Synchronized locks– Locks
• Compare the overall time needed for the 3 reading threads to perform 1 million read each
![Page 57: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/57.jpg)
Barrier synchronization• Consider again the prefix sum example
• Problem: the main thread needs to wait for all the processes to finish the first counting phase (join) before starting the second phase– This can be expensive: overhead to start new threads for
the second phase
• Possible solution: threads synchronize among themselves– Using a barrier
![Page 58: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/58.jpg)
Barrier
• A barrier is a place in the code where all the threads must arrive before any is allowed to proceed
• In Java, implemented by CyclicBarrier– Each thread invokes await– Only when all the threads have successfully invoked
await, they are allowed to proceed
![Page 59: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/59.jpg)
Atomic variables
• Atomic variables offer methods that are guaranteed to be atomic, even without using other synchronization primitives
• They typically exploit atomic operations offered in the instruction set of the CPU– They can be more efficient than synchronized blocks
or other synchronization primitives built on top of them
![Page 60: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/60.jpg)
Atomic variables• AtomicInteger offers the following (and many
other) methods
• getAndIncrement()• getAndDecrement()• getAndSet(int newValue)• getAndAccumulate(int x, IntBinaryOperator
accumulatorFunction)• …
![Page 61: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/61.jpg)
Exercise
• Modify the first version of the array filtering exercise using an AtomicInteger to hold the value of the first free position in the result array
• Compare the performance with respect to the original implementation
![Page 62: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/62.jpg)
Atomic variables
• Many other examples– AtomicBoolean, AtomicLong,
AtomicReference<V>, …– LongAdder, DoubleAdder, …– LongAccumulator, DoubleAccumulator, …
![Page 63: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/63.jpg)
Semaphores• Semaphores enable a limited number of concurrent
accesses
• The constructor takes in input the maximum capacity– Numer of available resources
• Two methods– acquire() to acquire a resource– release() to free a resource
![Page 64: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/64.jpg)
Exercise
• Implement that holds at most k different keys
• A thread trying to enter a new key when k keys are already in use is blocked– Until one key is freed
![Page 65: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/65.jpg)
CompletableFuture
• Like Future<T> …• … but enables composition!
supplyAsync
Supplier<U>
CompletableFuture<U>
CompletableFuture<T>
thenApplyAsync thenApplyAsync thenApplyAsync
CompletableFuture<V>
CompletableFuture<Z>
![Page 66: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/66.jpg)
CompletableFuture
• How to terminate a chain of computations?– By invoking the thenAcceptAsync() method– Takes in input a Consumer<T>– Computes the consumer function taking in input the
return value of the last CompletableFuture in the chain
![Page 67: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/67.jpg)
CompletableFuture
• If a computation returns a CompletableFutureitself …
• … then we can use the thenCompose() method
• When this terminated, it invokes the function within the thenCompose()
• Returns a new CompletableFuture
![Page 68: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/68.jpg)
CompletableFuture• Its called CompletableFuture because we can also
complete it– By invoking the complete() method
• For instance, if we are waiting for a result from a server and we realize that the server is not available anymore …
• … we can force the value of the CompletableFutureto be the last value stored in the local cache
![Page 69: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/69.jpg)
CompletableFuture
• We can also combine the values of two CompletableFuture
• f.thenCombineAsync()– Takes in input
• Another CompletableFuture (other)• A function that takes in input the results of this and the
results of other and uses them to compute a new result of type U
– Returns a CompletableFuture<U>
![Page 70: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/70.jpg)
CompletableFuture
• CompletableFuture also offers methods create conjunctions or disjunctions of CompletableFutures– allOf– anyOf
• This probably represents the best way to encode a work flow that executes tasks asynchronously with respect to the main program
![Page 71: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/71.jpg)
Example• Sending messages in background
CompletableFuture checkConnection =CompletableFuture.supplyAsync(Connection::check)
CompletableFuture checkBattery =CompletableFuture.supplyAsync(Battery::check)
CompletableFuture.allOf(checkConnection, checkBattery).thenApplyAsync(this::sendMsg).thenApplyAsync(this::sendAnotherMsg).thenAcceptAsync(this::notify)
![Page 72: ConcurrentProgramming in Javacorsi.dei.polimi.it/distsys/pub/concurrent_java.pdfJava concurrency model •Java adopts a preemptive model •Java ensures that threads with equal](https://reader030.vdocuments.us/reader030/viewer/2022040308/5ed914c76714ca7f47691d30/html5/thumbnails/72.jpg)
Exercise
• Write the pipeline example using CompletableFutures