7. Files d'attente

7.1. Introduction, pattern producteur / consommateur

Il est difficile de parler des files d'attente en programmation concurrente sans parler du pattern producteur / consommateur. Ce pattern est très simple à comprendre, et son utilisation est très répandue dans la pratique. On définit trois éléments :
  • un producteur, qui produit des tâches à effectuer ;
  • un consommateur, qui exécute les tâches une par une ;
  • une file d'attente, qui permet au producteur d'enregistrer ses tâches, et au consommateur d'en retirer.
Si le producteur produit trop de tâches par rapport à la capacité de consommation du consommateur, alors la file d'attente risque de saturer, et le système ne fonctionnera pas correctement. Au contraire, si le consommateur exécute ses tâches trop vite, alors la file d'attente finira par s'assécher. Ce comportement n'est pas problématique. L'avantage de ce pattern est sa souplesse. Le producteur et le consommateur ne se connaissent pas, et ne communiquent pas entre eux directement. Il n'y a donc pas de problème de concurrence d'accès entre eux. On peut étendre facilement ce pattern, au cas où l'on aurait plusieurs consommateurs, et plusieurs producteurs. Le système garde sa souplesse, puisque s'il manque des consommateurs, on peut toujours en ajouter. Le point délicat d'un tel système est la file d'attente qui permet aux tâches de passer d'un producteur à un consommateur. Cette file d'attente est utilisée en forte concurrence d'accès, et ne doit pas constituer un goulet d'étranglement dans notre système. L'API Concurrent nous fournit une interface modélisant exactement cette file d'attente : BlockingQueue<E>.

7.2. Interface BlockingQueue<E>

Cette interface est une extension de l'interface Queue<E> que nous avons déjà vue dans la partie sur l'API Collection. Rappelons que l'interface Queue<E> permet de modéliser une file d'attente, de taille éventuellement fixée, et qui ne peut pas être modifiée. D'une certaine façon, une file d'attente supporte trois opérations de base :
  • l'ajout d'un élément ;
  • le retrait d'un élément disponible ;
  • l'examen du prochain élément disponible, sans le retirer.
Comme ces trois opérations peuvent échouer (cas d'une file d'attente vide, ou au contraire saturée), l'interface Queue<E> définit deux comportement d'échec pour chacune de ces trois opérations. Ces deux comportements sont la génération d'une exception ou le renvoi d'un booléen à false. L'interface BlockingQueue définit deux comportements supplémentaires.
  • La demande d'ajout par la méthode put(E e), ou de retrait d'un élément par la méthode take() peut bloquer si la file d'attente est saturée, ou si elle est vide. Ces deux méthodes attendent en fait la fin de la saturation, ou l'arrivée d'un nouvel élément pour rendre la main.
  • Il existe deux autres méthodes, offer(E e, long time, TimeUnit unit) et poll(long time, TimeUnit unit) qui ont le même fonctionnement, sauf qu'elles ne peuvent attendre plus longtemps que le temps passé en paramètre. Quand ce temps est écoulé, elles retournent false ou null respectivement.
L'interface BlockingQueue expose également les méthodes drainTo(Collection coll) et drainTo(Collection coll, int maxElements), qui permettent de retirer des éléments par paquets, pour les stocker dans la collection passée en paramètre.

7.3. Implémentations de BlockingQueue

L'API Concurrent propose plusieurs implémentations de BlockingQueue. Les deux premières sont ArrayBlockingQueue et LinkedBlockingQueue. Ces deux files d'attente fonctionnent sur le principe FIFO. L'implémentation ArrayBlockingQueue a une capacité maximale, qui est fixée à la construction de l'objet. L'implémentation LinkedBlockingQueue peut être limitée en capacité lors de sa construction. Si aucune limite n'est fixée, alors elle utilise sa valeur par défaut : Integer.MAX_VALUE. L'implémentation PriorityBlockingQueue conserve ses éléments ordonnés, soit en utilisant un comparateur (instance de Comparator) passé lors de sa construction, soit en utilisant l'ordre de ses éléments si ce sont des implémentations de Comparable. On peut lui donner une taille maximale, ou la laisser à sa valeur par défaut, Integer.MAX_VALUE. Enfin, l'implémentation SynchronousQueue a un comportement un peu particulier. En fait elle n'a de file d'attente que le nom, puisque sa capacité est de 0. On ne peut donc pas l'interroger sur son contenu, ni itérer sur ses éléments, tout simplement parce qu'elle n'en possède pas. En fait, chaque insertion d'une nouvelle donnée dans une telle file, doit attendre qu'une demande de retrait soit faite pour rendre la main. Réciproquement, chaque demande de retrait ne rend la main que lorsqu'une demande d'insertion est faite. Cette classe permet à deux threads de se synchroniser sur des points de rendez-vous particuliers.

7.4. Exemple de producteur / consommateur

Construire un tel exemple n'est pas très compliqué, mais cela va nous donner l'occasion de montrer comment arrêter un tel système proprement. Notre système est une boulangerie. Cette boulangerie est alimentée régulièrement en pain par des boulangers, et de temps en temps des clients affamés se présentent. Voyons le code.

Exemple 74. Producteur / consommateur : la boulangerie

public  class Boulangerie {

    // plus prosaïquement, une boulangerie est une file d'attente de 20 cases
    private BlockingQueue<Pain> queue =  new ArrayBlockingQueue<Pain>(20) ;

    // on peut y déposer du pain, mais le boulanger n'est pas patient
    // si le panier de vente est plein, il s'en va
    public  boolean depose(Pain pain)  throws InterruptedException {
       return queue.offer(pain,  200, TimeUnit.MILLISECONDS) ;
   }

    // on peut en acheter, et le client n'est pas plus patient
    // que le boulanger
    public Pain achete ()  throws InterruptedException {
       return queue.poll(200, TimeUnit.MILLISECONDS) ;
   }

    // on peut interroger le stock
    public  int getStock() {
       return queue.size() ;
   }
}

La classe Pain est juste une classe standard, sans propriété particulière. Nous avons ensuite besoin de deux classes : Boulanger et Mangeur, toutes deux implémentations de Runnable.

Exemple 75. Producteur / consommateur : la classe Boulanger

public  class Boulanger  implements Runnable {

    public  void run() {

       try {
          while (true) {

             // toutes les secondes un boulanger produit un pain
            Thread.sleep(1000) ;
             boolean added = boulangerie.depose(new Pain()) ;

             if (added) {
               System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                   "[" + boulangerie.getStock() +  "] je livre.") ;
            }  else {
               System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                   "[" + boulangerie.getStock() +  "] la boulangerie est pleine.") ;
            }
         }

      }  catch (InterruptedException e) {
         System.out.println("[" + Thread.currentThread().getName() +  "] je m'arrête") ;
      }
   }
}

Exemple 76. Producteur / consommateur : la classe Mangeur

public  class Mangeur  implements Runnable {

    public  void run() {

       try {

          while (true) {
             // nos mangeurs mangent de façon aléatoire...
            Thread.sleep(rand.nextInt(1000)) ;
            Pain pain = boulangerie.achete() ;
             if (pain != null) {
               System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                   "[" + boulangerie.getStock() +  "] miam miam") ;
            }  else {
               System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                   "[" + boulangerie.getStock() +  "] j'ai faim") ;
            }
         }

      }  catch (InterruptedException e) {
         System.out.println("[" + Thread.currentThread().getName() +  "] je m'arrête") ;
      }
   }
}

Il ne nous reste plus qu'à faire fonctionner notre système, dans la méthode main() suivante.

Exemple 77. Producteur / consommateur : le système

public  static  void main(String[] args) {

    // on initialise une boulangerie, et une variable aléatoire pour nos client
    final Boulangerie boulangerie =  new Boulangerie() ;
    final Random rand =  new Random() ;

    // notre boulanger est un runnable
   Boulanger boulanger =  new Boulanger() ;

    // notre mangeur est aussi un runnable
   Mangeur mangeur =  new Mangeur() ;

   Thread [] boulangers =  new Thread[5] ;
   Thread [] mangeurs =  new Thread[2] ;

    // préparation des boulangers
    for (int i =  0 ; i < boulangers.length ; i++) {
      boulangers[i] =  new Thread(boulanger) ;
   }

    // préparation des mangeurs
    for (int i =  0 ; i < mangeurs.length ; i++) {
      mangeurs[i] =  new Thread(mangeur) ;
   }

    // lancement des boulangers
    for (int i =  0 ; i < boulangers.length ; i++) {
      boulangers[i].start() ;
   }

    // lancement des mangeurs
    for (int i =  0 ; i < mangeurs.length ; i++) {
      mangeurs[i].start() ;
   }
}

On peut faire tourner ce système, et il peut fonctionner correctement. On peut ajuster le nombre de boulangers et de clients, sans avoir à changer le code ni du boulanger, ni du client. S'il y a trop de boulangers, la file d'attente saturera, et s'il y a trop de clients, elle s'assèchera, ce qui est, dans les deux cas, attendu. Dans une application réelle, le boulanger et le client seraient dans deux classes séparées, sans contact l'une avec l'autre.

7.5. Arrêter un producteur / consommateur : pilule empoisonnée

La question qui se pose est la suivante : comment arrêter un tel système proprement, en garantissant que tous les éléments placés dans la file d'attente sont bien traités par les consommateurs ? On peut commencer par interrompre les producteurs, ce qui ne pose pas de problème. La file d'attente finira par s'assécher, et l'on pourra interrompre ensuite les consommateurs. Cette approche fonctionne, mais impose un temps d'attente qui peut poser problème. Si l'on interrompt de la même manière les consommateurs, il y a toutes les chances pour que des éléments de la file d'attente ne soient pas traités. Comment arrêter les producteurs et les consommateurs le plus rapidement possible, tout en garantissant que tous les éléments de la file d'attente ont bien été traités ? Le pattern utilisé pour ce faire porte le nom de poison pill : pilule empoisonnée. On place dans la file d'attente des éléments particuliers, autant que de consommateurs, qui vont tuer chaque consommateur, un par un. Quand un consommateur consomme une de ces pilules empoisonnée, il s'arrête de fonctionner, et notamment, il ne prend plus d'élément dans la file d'attente. Pour cela, on peut ajouter une constante à notre classe Pain.

Exemple 78. Pilule empoisonnée : création de la pilule

public  class Pain {

    // pain empoisonné !
    public  static  final PAIN_EMPOISONNE =  new Pain() ;
}

Lorsqu'un consommateur mange un tel pain, il doit mourir, et donc arrêter de fonctionner. Le code de notre consommateur devient donc le suivant.

Exemple 79. Pilule empoisonnée : modification du consommateur

    // notre mangeur devient empoisonnable !
   Runnable mangeur =  new Runnable() {

       public  void run() {

          try {

             while (true) {
                // nos mangeurs mangent de façon aléatoire...
               Thread.sleep(rand.nextInt(1000)) ;
               Pain pain = boulangerie.achete() ;
                if (pain != null) {
               
                   if (pain == Pain.PAIN_EMPOISONNE) {
                     System.out.println("Je meurs !") ;
                     Thread.currentThread().interrupt() ;
                  }  else {
                     System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                         "[" + boulangerie.getStock() +  "] miam miam") ;
                  }

               }  else {
                  System.out.println("[" + Thread.currentThread().getName() +  "]" + 
                                        "[" + boulangerie.getStock() +  "] j'ai faim") ;
               }
            }

         }  catch (InterruptedException e) {
         }
      }
   };

Enfin, le code d'arrêt de notre système est le suivant, à ajouter à la fin de la méthode main().

Exemple 80. Pilule empoisonnée : code d'arrêt du système

// temporisation pour stopper observer le système 20 secondes
 try {
   Thread.sleep(20000) ;
}  catch (InterruptedException e) {
}

 // arrêt de notre système 
 for (int i =  0 ; i < boulangers.length ; i++) {
    // interruption des producteurs
   boulangers[i].interrupt() ;
}

 // dépôt des pilules empoisonnées
 for (int i =  0 ; i < mangeurs.length ; i++) {
   boulangerie.deposePainEmpoisonne() ;
}

On ajoute la méthode deposePainEmpoisonne() suivante à notre classe Boulangerie.

Exemple 81. Pilule empoisonnée : ajout de la méthode d'arrêt à la file d'attente

// on ajoute simplement le pain empoisonné à la file d'attente
 public  void deposePainEmpoisonne() {
   queue.add(Pain.PAIN_EMPOISONNE) ;
}

Si l'on fait fonctionner ce nouveau système, on constate bien que nos clients mourront, une fois que la file d'attente a été épuisée. Ce système d'arrêt ressemble un peu à un roman d'Agatha Christie, où l'on empoisonne sans coup férir d'honnêtes consommateurs. Il est efficace, mais nécessite tout de même d'avoir en permanence la connaissance du nombre de consommateurs actifs.
Java API avancées
Retour au blog Java le soir
Cours & Tutoriaux
Table des matières
API Collection
1. Introduction
2. Interface Collection
2.1. Notion de Collection
2.2. Détail des méthodes disponibles
2.3. Interface Iterator
2.4. Implémentation, exemples d'utilisation
3. Interface List
3.1. Notion de List
3.2. Détail des méthodes disponibles
3.3. Interface ListIterator
3.4. Implémentations, exemples d'utilisation
4. Interface Set
4.1. Notion de Set
4.2. Implémentations HashSet et LinkedHashSet
4.3. Exemples d'utilisation
5. Interface SortedSet
5.1. Notion de SortedSet
5.2. Détails des méthodes disponibles
5.3. Exemples d'utilisation
6. Interface NavigableSet
6.1. Notion de NavigableSet
6.2. Détails des méthodes disponibles
6.3. Exemple d'utilisation
7. Interfaces Queue et Deque
7.1. Notion de file d'attente
7.2. Détail des méthodes disponibles
7.3. Utilisation des interfaces Queue et Deque
8. Tables de hachage
8.1. Notion de table de hachage
8.2. Interface Map
8.3. Interface Map.Entry
8.4. Interface SortedMap
8.5. Interface NavigableMap
8.6. Implémentations
8.7. Exemples d'utilisation
9. Classes utilitaires Collections et Arrays
9.1. Introduction
9.2. Classe Arrays
9.3. Classe Collections
Génériques
1. Introduction
2. Un premier exemple
2.1. Une première classe générique
2.2. Une première méthode générique
3. Contraindre un type générique
3.1. Problème posé
3.2. Contraindre un type générique
4. Implémentation des génériques
4.1. Type erasure
4.2. Types génériques et casts
4.3. Type générique et exception
4.4. Construction d'une instance générique
4.5. Génériques et membres statiques
4.6. Collisions de méthodes génériques
4.7. Implémentation de plusieurs types identiques
5. Type <?>
5.1. Introduction
5.2. Type ? extension d'un type
5.3. Type ? super-type d'un type
Expressions régulières
1. Introduction
2. Mise en œuvre des expressions régulières
2.1. Fonctionnement d'une regexp
2.2. Fonctionnement de l'API en Java
2.3. Un premier exemple
2.4. Classe Pattern
2.5. Classe Matcher
2.6. Utilisation des méthode find() et group()
2.7. Méthodes de remplacement
2.8. Sélection de régions
3. Syntaxe des expressions régulières
3.1. Notion de classe
3.2. Étude d'un cas réel
3.3. Recherche d'un mot précis
3.4. Recherche de deux mots précis
3.5. Recherche d'un mot commençant par une lettre donnée
3.6. Cas de mots comportant des caractères accentués
3.7. Recherche sur les lignes
Introspection
1. Introduction
2. La classe Class
2.1. Utilisation de Class
2.2. Méthodes disponibles
2.3. Remarque sur la propriété Accessible
2.4. Type d'une classe
2.5. Création d'une instance à partir d'un objet Class
2.6. Cas des énumérations
3. Les classes Method et Constructor
3.1. Utilisation de Method
3.2. Utilisation de Constructor
3.3. Méthodes disponibles
3.4. Invocation d'une méthode par introspection
4. La classe Field
4.1. Utilisation de Field
4.2. Méthodes disponibles
4.3. Accès à un champ par introspection
5. La classe Modifier
Programmation concurrente
1. Introduction
2. Lançons nos premiers threads
2.1. Introduction
2.2. Un premier thread, extension de Thread
2.3. Un deuxième thread, implémentation de Runnable
2.4. Remarque sur la méthode Thread.sleep(long)
2.5. Arrêter un thread
3. Concurrence d'accès
3.1. Notion d'état
3.2. Exemple de concurrence d'accès sur un état
3.3. Analyse de la concurrence d'accès
3.4. Solution au problème
3.5. Champs volatile
4. Synchronisation
4.1. Définition d'un bloc synchronisé
4.2. Fonctionnement d'un bloc synchronisé
4.3. Notion de deadlock
4.4. Bonnes pratiques pour la synchronisation de threads
5. Opérations atomiques
5.1. Atomicité d'une opération
5.2. Solutions disponibles
5.3. Variables atomiques
6. Collections synchronisées et concurrentes
6.1. Introduction
6.2. Position du problème
6.3. Solutions proposées
7. Files d'attente
7.1. Introduction, pattern producteur / consommateur
7.2. Interface BlockingQueue<E>
7.3. Implémentations de BlockingQueue
7.4. Exemple de producteur / consommateur
7.5. Arrêter un producteur / consommateur : pilule empoisonnée
8. Classes utilitaires de l'API Concurrent
8.1. Introduction
8.2. Énumération TimeUnit
8.3. Interface Callable<V>
8.4. Interfaces Future<V> et RunnableFuture<V>
8.5. Interface ScheduledFuture<V> et RunnableScheduledFuture<V>
9. Pattern executor
9.1. Notion de réserve de threads
9.2. Interface Executor
9.3. Interface ExecutorService
9.4. Interface ScheduledExecutorService
9.5. Classe Executors
9.6. Pattern de lancement de tâches
10. Classes de contrôle d'accès
10.1. Introduction
10.2. Interfaces Lock et ReadWriteLock
10.3. Notion de verrou réentrant
10.4. Classe RentrantLock
10.5. Classe ReadWriteRentrantLock
11. Sémaphores, barrières et latches
11.1. Introduction
11.2. Notion de sémaphore, classe Semaphore
11.3. Notion de latch, classe CountDownLatch
11.4. Notion de barrière, classe CyclicBarrier