multi threadingjava
DESCRIPTION
Developper des programmes performants en multithreading.TRANSCRIPT
![Page 1: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/1.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Multithreading en JavaEléments conceptuels de programmation concurrente
Elvadas NONO
Softeam Cadextan , Startech Java
October 16, 2013
1 / 20
![Page 2: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/2.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Outline1 Threads: the Origin2 Callable3 Future4 ThreadPool
ExecutorExecutorServiceExecutors
5 Synchronisation 1SynchronizedReentrantLockReadWriteLock
6 SynchronisationCountDownLatchCyclicBarrierPhaser
7 ForkJoin
2 / 20
![Page 3: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/3.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Thread
Thread: Plus petite unité d'exécution dans un programme.
Application: Ensemble de Threads s'exécutant en parallèledans la JVM
Non Deamon Thread vs Deamon Thread, Main Thread
Notion de priorité d'un Thread
Deux façons de créer un Thread:
extends java.lang.Thread ClassImplement java.lang.Runnable Interface
3 / 20
![Page 4: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/4.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
/∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗ OPTION1∗∗∗∗∗∗∗∗∗∗∗∗∗/public class MonThread extends Thread {
public void run() {// Thread behavior goes here !! . . .
}}
// Start ThreadMonTread t1= new MonThread();t1. start () ;
/∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗∗ OPTION2∗∗∗∗∗∗∗∗∗∗∗∗∗/public class MyRunnable implements Runnable {
public void run() {// Thread behavior goes here !! . . .
}}
Thread t2= new Thread ( new MyRunnable());t2 . start () ;
4 / 20
![Page 5: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/5.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Problèmes
Et le résultat ? retour de la methode run: void
gestion des exceptions
démarrage du thread
synchrone ou asynchrone?
4 / 20
![Page 6: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/6.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
L'interface Callable
public Interface Callable<V>
java.util.concurrent package, since JDK 1.5
Dé�nit une unique methode sans argument call.
Retourne un résultat Générique VPeut lever une exception
5 / 20
![Page 7: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/7.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
public class MyCallable implements Callable<String> {
public String call() throws exception {
// Thread behavior goes here!! . . .
}
}
6 / 20
![Page 8: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/8.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Future
L'interface Future: Résultat d'une opération asynchrone qui setermine dans le futur
Méthodes fournies
boolean cancel(boolean mayInterruptIfRunning)V get() Attends la �n du traitement et récupère le resultatboolean isCancelled() indique si la tache a été annuléeboolean isDone() Retourne Vrai si la le traitement est terminé
6 / 20
![Page 9: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/9.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
Executor
L'interface Executor: Découpler la soumission des tâchesconcurrentes des mécanismes d'exécution de chaque tâche
Méthodes fournies
void execute(Runnable command) Exécuter la commandepassée en paramètre dans le futureFuture<v> submit = executor.submit(worker);List<Future<T>�> invokeAll(Collection<? extendsCallable<T>�> tasks) Lien Future/Callable/Executor
Variantes
ExecutorServiceThreadPoolExeceutorSerial, Parallel, Cache, etcc
7 / 20
![Page 10: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/10.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
ExecutorService
ExecutorService: Spécialisation de l'interface Executor a�n desoumettre des
RunnableCallableCollection de CallableCollection de RunnableSous ensemble de Runnable/Callable
8 / 20
![Page 11: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/11.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
Executors
Factory de ThreadPool / Executor
static ExecutorService newSingleThreadExecutor() Créé unexécuteur o�ciant avec un seul worker thread et une queue detraitement.static ExecutorService newFixedThreadPool(int nThreads)pool de nThread Threads faisant usage d'une �le de tâches.static ExecutorService newCachedThreadPool() Poolfonctionnant suivant le sytème de cachestatic ScheduledExecutorServicenewSingleThreadScheduledExecutor() Executeur pouvantlancer des threads périodiques, avec des délais
9 / 20
![Page 12: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/12.jpg)
Threads for the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
public class PoolThreadTest {
private static final int NB_TASKS = 100;
public static void main(String args[]){
// Execultor service Factory
ExecutorService cacheExecutor =
Executors.newCachedThreadPool();
ExecutorService singleExecutor =
Executors.newSingleThreadExecutor();
ExecutorService fixedExecutor =
Executors.newFixedThreadPool(3);
// Create a task list
Collection<DummyCallable> tasks= new
9 / 20
![Page 13: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/13.jpg)
Threads executor the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
ArrayList<>(NB_TASKS);
for(int i=0;i<NB_TASKS;i++){ DummyCallable
t= new DummyCallable(i);
tasks.add(t);
}
//start executor
startExecutor(fixedExecutor, tasks);
}
private static void startExecutor(ExecutorService
fixedExecutor, Collection<DummyCallable> tasks) {
try {
List<Future<Long>>
9 / 20
![Page 14: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/14.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
futures=executor.invokeAll(tasks);
executor.awaitTermination(1, TimeUnit.MINUTES); }
catch (InterruptedException |
IllegalStateException e) {
e.printStackTrace();
}finally{
executor.shutdown();
System.out.println("bye bye!");
}
}
10 / 20
![Page 15: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/15.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
ExecutorExecutorServiceExecutors
ScheduledExecutorService
Exécuter des tâches
Après un délai
schedule (Callable task, long delay, TimeUnit timeunit)schedule (Runnable task, long delay, TimeUnit timeunit)
A une fréquence �xe: example toutes les 5min
scheduleAtFixedRate (Runnable, long initialDelay, long period,TimeUnit timeunit)
Avec un délai �xe entre la �n d'un Run et le début du suivant
scheduleWithFixedDelay (Runnable, long initialDelay, longperiod, TimeUnit timeunit)
Comme tous les Executeurs vous devez les arrêterexplicitement après usage
shutdown() or shutdownNow()10 / 20
![Page 16: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/16.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
ScheduledExecutorService ses =
Executors.newScheduledThreadPool(5);
ScheduledFuture sf = ses.schedule(new Callable() {
public Object call() throws Exception {
System.out.println("Executed!");
return "StarTech Java!";
} }, 5, TimeUnit.SECONDS);
System.out.println("result = " + sf.get());
ses.shutdown();
11 / 20
![Page 17: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/17.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
Synchronized
Protection de l'accès aux sections critiques
Implémentation d'un lock structuré:synchronized methods andsynchronized statements
Interdit l'exécution concurrente de regions protégées sur unmême objet par deux Threads
Reentrant mechanism
11 / 20
![Page 18: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/18.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
//***********SOLUTION1
public static synchronized Singleton getInstance(){
if (instance==null) //1
instance=new Singleton(); //2
return instance; //3
}
//*************SOLUTION2
public static Singleton getInstance(){
if (instance==null) { //1
synchronized (Singleton.class){ //2
instance=new Singleton();
}
}
return instance;
}
11 / 20
![Page 19: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/19.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
// Q:which one is correct? A: None Why?
12 / 20
![Page 20: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/20.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
Synchronized issues
Problèmes
Thread automatiquemnt bloqué si le verrou est occupé
Pas de vue sur la �le des Threads bloqués et reprise après lock
Gestion des locks multiples
Lock Structuré
Unfair Lock
No way to check ability to trying for lock interruptibly
No delay/timeout waiting period
Di�erence Read/Write operations
12 / 20
![Page 21: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/21.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
ReetrantLock
ReetrantLock: Lock Reentrant plus �exible
Fairness: The most waiting Thread is always choosen ifactivatedSpecialisation: ReaderLock and WritterLockWaiting Thread QueuetryLock to check if lock is available : not blockant
13 / 20
![Page 22: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/22.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
SynchronizedReentrantLockReadWriteLock
ReadWriteLock
ReadWriteLock: Interface de gestion d'un lock enLecture/Ecriture
Plusieurs Lecteurs autorisés en parallèle
Les Rédacteurs sont prioritaires sur les lecteurs
En interne: Deux lock séparés
ReentrantReadWriteLock implements ReadWriteLock
14 / 20
![Page 23: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/23.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
ReadWriteLock readWriteLock = new
ReentrantReadWriteLock();
readWriteLock.readLock().lock();
// Plusieurs Lecteur peuvent entrer dans cette section
// si le verrou n'est pas occupé par un Redacteur, et
aucun rédacteur en Attente du verrou
readWriteLock.readLock().unlock();
readWriteLock.writeLock().lock();
// seuls les Redacteurs peuvent atteindre cette
section,
// si aucun lecteur actif.
readWriteLock.writeLock().unlock();
15 / 20
![Page 24: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/24.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
CountDownLatch
Permettre à un ou plusieurs Thread d'attendre la �n del'exécution d'un ensemble d'actions par un ou plusieurs autresThreads
un CountDownLatch est initialisé avec un compteur
la méthode countDown() décrémente le compteur
la méthode await() bloque le Thread appelant jusqu'a ce quele compteur atteigne la valeur zero
Tous les Thread bloqués sont libérés lorsque le compteurdevient nul
le compteur ne peut pas être réinitialisé.
Tout appel à await après que le compteur soit passé à zero setermine imédiatement
15 / 20
![Page 25: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/25.jpg)
Threads the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
final CountDownLatch countDownLatch = new
CountDownLatch(2);
Thread t = new Thread(new Runnable() {
public void run() {
System.out.println(countDownLatch.getCount());
//(2)
countDownLatch.countDown();
System.out.println(countDownLatch.getCount());
//(1)
try {
Thread.sleep(5000);
countDownLatch.countDown();
System.out.println(countDownLatch.getCount());
//(0)
} catch (InterruptedException e) {
e.printStackTrace();
15 / 20
![Page 26: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/26.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
}
}
});
t.start();
try {
countDownLatch.await(); //blocks the current Thread
System.out.println("MainThread
"+countDownLatch.getCount()); // run only when latch=0
} catch (InterruptedException e) {
e.printStackTrace();
}
16 / 20
![Page 27: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/27.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
CyclicBarrier
Permettre à un ou plusieurs Thread de s'attendre à un pointde rendez vous.
un CyclicBarrier est initialisé avec un compteur
Contrairement au CountDownLatch, le compteur peut êtreréinitialisé pour un nouveau cycle: reset
la méthode await() bloque le Thread appelant jusqu'a ce quel'une des conditions suivantes soit véri�ée
le compteur atteind la valeur zero naturellement ou par unappel à resetLe Thread appelant est interrompu par un autre Threadun Thread en attente du RDV atteind son timeout
Tous les Thread bloqués sont libérés lorsque le compteurdevient nul ou le compteur ressetté
Possibilité de dé�nir un traitement à executer lorsque tous lesThreads sont au point de RendezVous
16 / 20
![Page 28: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/28.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
//create a Cyclic Barrier
CyclicBarrier barrier = new CyclicBarrier(5);
//Make the current thread wait at barrier
barrier.await();
//after 15s the Thread is release even if the counter is
not 0
barrier.await(15, TimeUnit.SECONDS);
//define a barrier action
Runnable barrierAction = new Runnable( ... ;
CyclicBarrier barrier = new CyclicBarrier(3,
barrierAction);
17 / 20
![Page 29: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/29.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
CountDownLatchCyclicBarrierPhaser
Phaser
Mécanisme de synchornisation à un point de rendez voussimilaire au CyclicBarrier
Supporte plusieurs cycles, JDK 7
le nombre de parties n'est pas �xé à l'avance: dynamique au �ldu temps et des cycles
l'enregistrement se fait à tout moment: methode register
Atteindre le point de RDV int arrive() / et se désister duprochain cycle arriveAndDeregister()
boolean onAdvance(int phase, int registeredParties), réagir auxchangement de cycles
int arriveAndAwaitAdvance() attendre les autres Threads aupoint de rendez vous.
Attendre une phase particulière int awaitAdvance(int phase)17 / 20
![Page 30: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/30.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Fork/Join
Programmation Multi-Core, DPR algorithm,JSR 166 libs,native in JDK7
ForkJoinPool
Fixed Pool Size Thread, if no size speci�ed=> Number ofavailable processorsExecutorService permettant d'exécuter des ForkJoinTasksUn Thread peut récupérer une tâche dans la �le d'attente d'unautre Thread
fork: subdiviser un traitement en plusieurs sous traitementsparallèles
RecursiveTask & RecursiveAction: sous classes deForkJoinTask
join: Récupérer le résultat d'une sous tâche 18 / 20
![Page 31: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/31.jpg)
Threads public the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
18 / 20
![Page 32: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/32.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
public class MaxSearchTask extends RecursiveTask{
protected Long compute() {
if(sup-inf<5){
return simpleMax(); }
else{
int mid=(inf+sup)/2;
MaxSearchTask search1 = new MaxSearchTask(inf,mid, t);
MaxSearchTask search2 = new MaxSearchTask(mid, sup, t);
invokeAll(search1,search2);
return Math.max((Long)search1.join(),(Long)
search2.join());
}
...
}
19 / 20
![Page 33: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/33.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Multithreading is also
Types Atomiques, AtomicInteger, AtomicXXX
Volatile: The volatile force the thread to update the originalvariable.
Utiliser les collections appropriées en fonction des situations:
Thread safe collections vs non Thread safe collectionsBlockingQueue
Outils de pro�ling: JPro�ler, VisualVM,
Debugging, JDWP
...
19 / 20
![Page 34: Multi threadingJava](https://reader033.vdocuments.us/reader033/viewer/2022052523/5566280ad8b42ac0498b4997/html5/thumbnails/34.jpg)
Threads: the OriginCallableFuture
ThreadPoolSynchronisation 1Synchronisation
ForkJoin
Q&A
Now this is not the end.
It is not even the beginning of the end.
But it is, perhaps, the end of the beginning.
Winston Churchill
20 / 20