Il est difficile de parler des files d'attente en programmation concurrente sans parler du pattern producteur / consommateur. Ce pattern est très simple à comprendre, et son utilisation est très répandue dans la pratique.

On définit trois éléments :

  • un producteur, qui produit des tâches à effectuer ;

  • un consommateur, qui exécute les tâches une par une ;

  • une file d'attente, qui permet au producteur d'enregistrer ses tâches, et au consommateur d'en retirer.

Si le producteur produit trop de tâches par rapport à la capacité de consommation du consommateur, alors la file d'attente risque de saturer, et le système ne fonctionnera pas correctement.

Au contraire, si le consommateur exécute ses tâches trop vite, alors la file d'attente finira par s'assécher. Ce comportement n'est pas problématique.

L'avantage de ce pattern est sa souplesse. Le producteur et le consommateur ne se connaissent pas, et ne communiquent pas entre eux directement. Il n'y a donc pas de problème de concurrence d'accès entre eux. On peut étendre facilement ce pattern, au cas où l'on aurait plusieurs consommateurs, et plusieurs producteurs. Le système garde sa souplesse, puisque s'il manque des consommateurs, on peut toujours en ajouter.

Le point délicat d'un tel système est la file d'attente qui permet aux tâches de passer d'un producteur à un consommateur. Cette file d'attente est utilisée en forte concurrence d'accès, et ne doit pas constituer un goulet d'étranglement dans notre système.

L'API Concurrent nous fournit une interface modélisant exactement cette file d'attente : BlockingQueue<E>.

Cette interface est une extension de l'interface Queue<E> que nous avons déjà vue dans la partie sur l'API Collection.

Rappelons que l'interface Queue<E> permet de modéliser une file d'attente, de taille éventuellement fixée, et qui ne peut pas être modifiée.

D'une certaine façon, une file d'attente supporte trois opérations de base :

  • l'ajout d'un élément ;

  • le retrait d'un élément disponible ;

  • l'examen du prochain élément disponible, sans le retirer.

Comme ces trois opérations peuvent échouer (cas d'une file d'attente vide, ou au contraire saturée), l'interface Queue<E> définit deux comportement d'échec pour chacune de ces trois opérations. Ces deux comportements sont la génération d'une exception ou le renvoi d'un booléen à false.

L'interface BlockingQueue définit deux comportements supplémentaires.

  • La demande d'ajout par la méthode put(E e), ou de retrait d'un élément par la méthode take() peut bloquer si la file d'attente est saturée, ou si elle est vide. Ces deux méthodes attendent en fait la fin de la saturation, ou l'arrivée d'un nouvel élément pour rendre la main.

  • Il existe deux autres méthodes, offer(E e, long time, TimeUnit unit) et poll(long time, TimeUnit unit) qui ont le même fonctionnement, sauf qu'elles ne peuvent attendre plus longtemps que le temps passé en paramètre. Quand ce temps est écoulé, elles retournent false ou null respectivement.

L'interface BlockingQueue expose également les méthodes drainTo(Collection coll) et drainTo(Collection coll, int maxElements), qui permettent de retirer des éléments par paquets, pour les stocker dans la collection passée en paramètre.

L'API Concurrent propose plusieurs implémentations de BlockingQueue.

Les deux premières sont ArrayBlockingQueue et LinkedBlockingQueue. Ces deux files d'attente fonctionnent sur le principe FIFO. L'implémentation ArrayBlockingQueue a une capacité maximale, qui est fixée à la construction de l'objet. L'implémentation LinkedBlockingQueue peut être limitée en capacité lors de sa construction. Si aucune limite n'est fixée, alors elle utilise sa valeur par défaut : Integer.MAX_VALUE.

L'implémentation PriorityBlockingQueue conserve ses éléments ordonnés, soit en utilisant un comparateur (instance de Comparator) passé lors de sa construction, soit en utilisant l'ordre de ses éléments si ce sont des implémentations de Comparable. On peut lui donner une taille maximale, ou la laisser à sa valeur par défaut, Integer.MAX_VALUE.

Enfin, l'implémentation SynchronousQueue a un comportement un peu particulier. En fait elle n'a de file d'attente que le nom, puisque sa capacité est de 0. On ne peut donc pas l'interroger sur son contenu, ni itérer sur ses éléments, tout simplement parce qu'elle n'en possède pas.

En fait, chaque insertion d'une nouvelle donnée dans une telle file, doit attendre qu'une demande de retrait soit faite pour rendre la main. Réciproquement, chaque demande de retrait ne rend la main que lorsqu'une demande d'insertion est faite. Cette classe permet à deux threads de se synchroniser sur des points de rendez-vous particuliers.

Construire un tel exemple n'est pas très compliqué, mais cela va nous donner l'occasion de montrer comment arrêter un tel système proprement.

Notre système est une boulangerie. Cette boulangerie est alimentée régulièrement en pain par des boulangers, et de temps en temps des clients affamés se présentent. Voyons le code.


La classe Pain est juste une classe standard, sans propriété particulière. Nous avons ensuite besoin de deux classes : Boulanger et Mangeur, toutes deux implémentations de Runnable.



Il ne nous reste plus qu'à faire fonctionner notre système, dans la méthode main() suivante.


On peut faire tourner ce système, et il peut fonctionner correctement. On peut ajuster le nombre de boulangers et de clients, sans avoir à changer le code ni du boulanger, ni du client. S'il y a trop de boulangers, la file d'attente saturera, et s'il y a trop de clients, elle s'assèchera, ce qui est, dans les deux cas, attendu.

Dans une application réelle, le boulanger et le client seraient dans deux classes séparées, sans contact l'une avec l'autre.

La question qui se pose est la suivante : comment arrêter un tel système proprement, en garantissant que tous les éléments placés dans la file d'attente sont bien traités par les consommateurs ?

On peut commencer par interrompre les producteurs, ce qui ne pose pas de problème. La file d'attente finira par s'assécher, et l'on pourra interrompre ensuite les consommateurs. Cette approche fonctionne, mais impose un temps d'attente qui peut poser problème.

Si l'on interrompt de la même manière les consommateurs, il y a toutes les chances pour que des éléments de la file d'attente ne soient pas traités.

Comment arrêter les producteurs et les consommateurs le plus rapidement possible, tout en garantissant que tous les éléments de la file d'attente ont bien été traités ?

Le pattern utilisé pour ce faire porte le nom de poison pill : pilule empoisonnée. On place dans la file d'attente des éléments particuliers, autant que de consommateurs, qui vont tuer chaque consommateur, un par un. Quand un consommateur consomme une de ces pilules empoisonnée, il s'arrête de fonctionner, et notamment, il ne prend plus d'élément dans la file d'attente.

Pour cela, on peut ajouter une constante à notre classe Pain.


Lorsqu'un consommateur mange un tel pain, il doit mourir, et donc arrêter de fonctionner. Le code de notre consommateur devient donc le suivant.


Enfin, le code d'arrêt de notre système est le suivant, à ajouter à la fin de la méthode main().


On ajoute la méthode deposePainEmpoisonne() suivante à notre classe Boulangerie.


Si l'on fait fonctionner ce nouveau système, on constate bien que nos clients mourront, une fois que la file d'attente a été épuisée.

Ce système d'arrêt ressemble un peu à un roman d'Agatha Christie, où l'on empoisonne sans coup férir d'honnêtes consommateurs. Il est efficace, mais nécessite tout de même d'avoir en permanence la connaissance du nombre de consommateurs actifs.

Java API avancées
Retour au blog Java le soir
Cours & Tutoriaux

Table des matières

API Collection
1. Introduction
2. Interface Collection
2.1. Notion de Collection
2.2. Détail des méthodes disponibles
2.3. Interface Iterator
2.4. Implémentation, exemples d'utilisation
3. Interface List
3.1. Notion de List
3.2. Détail des méthodes disponibles
3.3. Interface ListIterator
3.4. Implémentations, exemples d'utilisation
4. Interface Set
4.1. Notion de Set
4.2. Implémentations HashSet et LinkedHashSet
4.3. Exemples d'utilisation
5. Interface SortedSet
5.1. Notion de SortedSet
5.2. Détails des méthodes disponibles
5.3. Exemples d'utilisation
6. Interface NavigableSet
6.1. Notion de NavigableSet
6.2. Détails des méthodes disponibles
6.3. Exemple d'utilisation
7. Interfaces Queue et Deque
7.1. Notion de file d'attente
7.2. Détail des méthodes disponibles
7.3. Utilisation des interfaces Queue et Deque
8. Tables de hachage
8.1. Notion de table de hachage
8.2. Interface Map
8.3. Interface Map.Entry
8.4. Interface SortedMap
8.5. Interface NavigableMap
8.6. Implémentations
8.7. Exemples d'utilisation
9. Classes utilitaires Collections et Arrays
9.1. Introduction
9.2. Classe Arrays
9.3. Classe Collections
Génériques
1. Introduction
2. Un premier exemple
2.1. Une première classe générique
2.2. Une première méthode générique
3. Contraindre un type générique
3.1. Problème posé
3.2. Contraindre un type générique
4. Implémentation des génériques
4.1. Type erasure
4.2. Types génériques et casts
4.3. Type générique et exception
4.4. Construction d'une instance générique
4.5. Génériques et membres statiques
4.6. Collisions de méthodes génériques
4.7. Implémentation de plusieurs types identiques
5. Type <?>
5.1. Introduction
5.2. Type ? extension d'un type
5.3. Type ? super-type d'un type
Expressions régulières
1. Introduction
2. Mise en œuvre des expressions régulières
2.1. Fonctionnement d'une regexp
2.2. Fonctionnement de l'API en Java
2.3. Un premier exemple
2.4. Classe Pattern
2.5. Classe Matcher
2.6. Utilisation des méthode find() et group()
2.7. Méthodes de remplacement
2.8. Sélection de régions
3. Syntaxe des expressions régulières
3.1. Notion de classe
3.2. Étude d'un cas réel
3.3. Recherche d'un mot précis
3.4. Recherche de deux mots précis
3.5. Recherche d'un mot commençant par une lettre donnée
3.6. Cas de mots comportant des caractères accentués
3.7. Recherche sur les lignes
Introspection
1. Introduction
2. La classe Class
2.1. Utilisation de Class
2.2. Méthodes disponibles
2.3. Remarque sur la propriété Accessible
2.4. Type d'une classe
2.5. Création d'une instance à partir d'un objet Class
2.6. Cas des énumérations
3. Les classes Method et Constructor
3.1. Utilisation de Method
3.2. Utilisation de Constructor
3.3. Méthodes disponibles
3.4. Invocation d'une méthode par introspection
4. La classe Field
4.1. Utilisation de Field
4.2. Méthodes disponibles
4.3. Accès à un champ par introspection
5. La classe Modifier
Programmation concurrente
1. Introduction
2. Lançons nos premiers threads
2.1. Introduction
2.2. Un premier thread, extension de Thread
2.3. Un deuxième thread, implémentation de Runnable
2.4. Remarque sur la méthode Thread.sleep(long)
2.5. Arrêter un thread
3. Concurrence d'accès
3.1. Notion d'état
3.2. Exemple de concurrence d'accès sur un état
3.3. Analyse de la concurrence d'accès
3.4. Solution au problème
3.5. Champs volatile
4. Synchronisation
4.1. Définition d'un bloc synchronisé
4.2. Fonctionnement d'un bloc synchronisé
4.3. Notion de deadlock
4.4. Bonnes pratiques pour la synchronisation de threads
5. Opérations atomiques
5.1. Atomicité d'une opération
5.2. Solutions disponibles
5.3. Variables atomiques
6. Collections synchronisées et concurrentes
6.1. Introduction
6.2. Position du problème
6.3. Solutions proposées
7. Files d'attente
7.1. Introduction, pattern producteur / consommateur
7.2. Interface BlockingQueue<E>
7.3. Implémentations de BlockingQueue
7.4. Exemple de producteur / consommateur
7.5. Arrêter un producteur / consommateur : pilule empoisonnée
8. Classes utilitaires de l'API Concurrent
8.1. Introduction
8.2. Énumération TimeUnit
8.3. Interface Callable<V>
8.4. Interfaces Future<V> et RunnableFuture<V>
8.5. Interface ScheduledFuture<V> et RunnableScheduledFuture<V>
9. Pattern executor
9.1. Notion de réserve de threads
9.2. Interface Executor
9.3. Interface ExecutorService
9.4. Interface ScheduledExecutorService
9.5. Classe Executors
9.6. Pattern de lancement de tâches
10. Classes de contrôle d'accès
10.1. Introduction
10.2. Interfaces Lock et ReadWriteLock
10.3. Notion de verrou réentrant
10.4. Classe RentrantLock
10.5. Classe ReadWriteRentrantLock
11. Sémaphores, barrières et latches
11.1. Introduction
11.2. Notion de sémaphore, classe Semaphore
11.3. Notion de latch, classe CountDownLatch
11.4. Notion de barrière, classe CyclicBarrier