BlockingQueue<E>
.
Queue<E>
que nous avons déjà vue dans la partie sur l'API Collection.
Rappelons que l'interface
Queue<E>
permet de modéliser une file d'attente, de taille éventuellement fixée, et qui ne peut pas être modifiée.
D'une certaine façon, une file d'attente supporte trois opérations de base :
Queue<E>
définit deux comportement d'échec pour chacune de ces trois opérations. Ces deux comportements sont la génération d'une exception ou le renvoi d'un booléen à
false
.
L'interface
BlockingQueue
définit deux comportements supplémentaires.
put(E e)
, ou de retrait d'un élément par la méthode
take()
peut bloquer si la file d'attente est saturée, ou si elle est vide. Ces deux méthodes attendent en fait la fin de la saturation, ou l'arrivée d'un nouvel élément pour rendre la main.
offer(E e, long time, TimeUnit unit)
et
poll(long time, TimeUnit unit)
qui ont le même fonctionnement, sauf qu'elles ne peuvent attendre plus longtemps que le temps passé en paramètre. Quand ce temps est écoulé, elles retournent
false
ou
null
respectivement.
BlockingQueue
expose également les méthodes
drainTo(Collection coll)
et
drainTo(Collection coll, int maxElements)
, qui permettent de retirer des éléments par paquets, pour les stocker dans la collection passée en paramètre.
BlockingQueue
.
Les deux premières sont
ArrayBlockingQueue
et
LinkedBlockingQueue
. Ces deux files d'attente fonctionnent sur le principe FIFO. L'implémentation
ArrayBlockingQueue
a une capacité maximale, qui est fixée à la construction de l'objet. L'implémentation
LinkedBlockingQueue
peut être limitée en capacité lors de sa construction. Si aucune limite n'est fixée, alors elle utilise sa valeur par défaut :
Integer.MAX_VALUE
.
L'implémentation
PriorityBlockingQueue
conserve ses éléments ordonnés, soit en utilisant un comparateur (instance de
Comparator
) passé lors de sa construction, soit en utilisant l'ordre de ses éléments si ce sont des implémentations de
Comparable
. On peut lui donner une taille maximale, ou la laisser à sa valeur par défaut,
Integer.MAX_VALUE
.
Enfin, l'implémentation
SynchronousQueue
a un comportement un peu particulier. En fait elle n'a de file d'attente que le nom, puisque sa capacité est de 0. On ne peut donc pas l'interroger sur son contenu, ni itérer sur ses éléments, tout simplement parce qu'elle n'en possède pas.
En fait, chaque insertion d'une nouvelle donnée dans une telle file, doit attendre qu'une demande de retrait soit faite pour rendre la main. Réciproquement, chaque demande de retrait ne rend la main que lorsqu'une demande d'insertion est faite. Cette classe permet à deux
threads
de se synchroniser sur des points de rendez-vous particuliers.
Exemple 74. Producteur / consommateur : la boulangerie
public class Boulangerie { // plus prosaïquement, une boulangerie est une file d'attente de 20 cases private BlockingQueue<Pain> queue = new ArrayBlockingQueue<Pain>(20) ; // on peut y déposer du pain, mais le boulanger n'est pas patient // si le panier de vente est plein, il s'en va public boolean depose(Pain pain) throws InterruptedException { return queue.offer(pain, 200, TimeUnit.MILLISECONDS) ; } // on peut en acheter, et le client n'est pas plus patient // que le boulanger public Pain achete () throws InterruptedException { return queue.poll(200, TimeUnit.MILLISECONDS) ; } // on peut interroger le stock public int getStock() { return queue.size() ; } }
Pain
est juste une classe standard, sans propriété particulière. Nous avons ensuite besoin de deux classes :
Boulanger
et
Mangeur
, toutes deux implémentations de
Runnable
.
Exemple 75. Producteur / consommateur : la classe
Boulanger
public class Boulanger implements Runnable { public void run() { try { while (true) { // toutes les secondes un boulanger produit un pain Thread.sleep(1000) ; boolean added = boulangerie.depose(new Pain()) ; if (added) { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] je livre.") ; } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] la boulangerie est pleine.") ; } } } catch (InterruptedException e) { System.out.println("[" + Thread.currentThread().getName() + "] je m'arrête") ; } } }
Exemple 76. Producteur / consommateur : la classe
Mangeur
public class Mangeur implements Runnable { public void run() { try { while (true) { // nos mangeurs mangent de façon aléatoire... Thread.sleep(rand.nextInt(1000)) ; Pain pain = boulangerie.achete() ; if (pain != null) { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] miam miam") ; } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] j'ai faim") ; } } } catch (InterruptedException e) { System.out.println("[" + Thread.currentThread().getName() + "] je m'arrête") ; } } }
main()
suivante.
Exemple 77. Producteur / consommateur : le système
public static void main(String[] args) { // on initialise une boulangerie, et une variable aléatoire pour nos client final Boulangerie boulangerie = new Boulangerie() ; final Random rand = new Random() ; // notre boulanger est un runnable Boulanger boulanger = new Boulanger() ; // notre mangeur est aussi un runnable Mangeur mangeur = new Mangeur() ; Thread [] boulangers = new Thread[5] ; Thread [] mangeurs = new Thread[2] ; // préparation des boulangers for (int i = 0 ; i < boulangers.length ; i++) { boulangers[i] = new Thread(boulanger) ; } // préparation des mangeurs for (int i = 0 ; i < mangeurs.length ; i++) { mangeurs[i] = new Thread(mangeur) ; } // lancement des boulangers for (int i = 0 ; i < boulangers.length ; i++) { boulangers[i].start() ; } // lancement des mangeurs for (int i = 0 ; i < mangeurs.length ; i++) { mangeurs[i].start() ; } }
Pain
.
Exemple 78. Pilule empoisonnée : création de la pilule
public class Pain {
// pain empoisonné !
public static final PAIN_EMPOISONNE = new Pain() ;
}
Exemple 79. Pilule empoisonnée : modification du consommateur
// notre mangeur devient empoisonnable ! Runnable mangeur = new Runnable() { public void run() { try { while (true) { // nos mangeurs mangent de façon aléatoire... Thread.sleep(rand.nextInt(1000)) ; Pain pain = boulangerie.achete() ; if (pain != null) { if (pain == Pain.PAIN_EMPOISONNE) { System.out.println("Je meurs !") ; Thread.currentThread().interrupt() ; } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] miam miam") ; } } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "[" + boulangerie.getStock() + "] j'ai faim") ; } } } catch (InterruptedException e) { } } };
main()
.
Exemple 80. Pilule empoisonnée : code d'arrêt du système
// temporisation pour stopper observer le système 20 secondes try { Thread.sleep(20000) ; } catch (InterruptedException e) { } // arrêt de notre système for (int i = 0 ; i < boulangers.length ; i++) { // interruption des producteurs boulangers[i].interrupt() ; } // dépôt des pilules empoisonnées for (int i = 0 ; i < mangeurs.length ; i++) { boulangerie.deposePainEmpoisonne() ; }
deposePainEmpoisonne()
suivante à notre classe
Boulangerie
.
Exemple 81. Pilule empoisonnée : ajout de la méthode d'arrêt à la file d'attente
// on ajoute simplement le pain empoisonné à la file d'attente
public void deposePainEmpoisonne() {
queue.add(Pain.PAIN_EMPOISONNE) ;
}