Java 7 : fork / join

Un précédent article, Synchronisation et volatilité, traitait de programmation concurrente, et de la façon dont le JMM (Java Memory Model), avait été entièrement réécrit en Java 5 (JLS v3). Il laissait complètement de côté l’API java.util.concurrent, nouveauté de Java 5, écrite par Doug Lea, et conçue à partir de l’API edu.oswego, disponible en Java 4. Cette API a subi quelques évolutions entre Java 5 et Java 6 , et en subit encore en Java 7.

C’est à un petit bilan de l’existant, et un rapide tour de table des nouveautés que je te convie aujourd’hui, cher et précieux lecteur. Je parlerai surtout du framework fork / join, et de la façon dont on peut l’utiliser. Pour une fois, dans Java le Soir, on va parler de l’avenir. Note bien, que, plus le temps passe, et plus l’avenir a tendance à devenir le passé, ce qui finira aussi par arriver à ce qui est écrit ici !

L’écriture de l’API java.util.concurrent visait à fournir à l’API standard de Java les outils de gestion de la programmation concurrente qui lui faisaient cruellement défaut. Regroupons ces apports en trois catégories.

  • Le pattern Executor, qui permet de contrôler le nombre de threads lancés par une application. Vient avec l’interface Callable<V>, pendant de Runnable, et l’interface Future<V>.
  • Les extensions de java.util.collection, avec les listes et tables de hachage concurrentes, les files d’attente bloquantes, les tableaux synchronisés en écriture.
  • Les variables atomiques, dans le package java.util.concurrent.atomic.
  • Les verrous : interface Lock et classes ReadLock, WriteLock et RentrantLock.
  • Les classes particulières qui modélisent les sémaphores (Semaphore), les latch (CountDownLatch), les barrières (CyclicBarrier).

Java 7 nous apporte quelques nouveautés.

  • Une interface TransferQueue, qui modélise une file d’attente bloquante, dans laquelle le producteur peut attendre qu’un consommateur ait pris ce que ce producteur place dans la file.
  • Une classe Phaser, qui expose un nouvel élément de synchronisation, notamment utilisable dans le framework fork / join.
  • Une classe de génération de nombres aléatoires locale à un thread donné : ThreadLocalRandom. La javadoc indique que c’est cette classe qu’il faut utiliser lorsque de nombreux threads ont besoin de générateurs de nombres aléatoires, et annonce que les problèmes de contention sont résolus. J’avoue que cette remarque me laisse un peu dubitatif, dans la mesure où c’est uniquement à l’initialisation des classes Random et SecureRandom que les problèmes de contention peuvent apparaître. En revanche, et ce point n’est pas mentionné dans la javadoc, l’utilisation de cette classe permet d’avoir des séries aléatoires parfaitement reproductibles, notamment en environnement fortement concurrent, ce qui, notamment pour les tests, est capital.

Elle apporte, enfin et surtout le framework fork / join, dont nous allons parler plus en détails.

Framework fork / join

Ce framework est la principale nouveauté du package java.util.concurrent. Comme la plupart des frameworks, il est censé résoudre un problème. Il arrive cependant que des frameworks posent plus de problèmes qu’ils n’en résolvent. Comme je sais que ton temps est cher, précieux lecteur, et que le mien l’est aussi, de ces frameworks, nous éviterons de parler.

Le problème qu’il se propose de résoudre concerne le traitement de larges quantités de données, numériques ou non. Il arrive souvent que ces quantités peuvent être traitées paquet par paquet, chaque paquet pouvant être pris en compte de façon indépendante des autres. Le traitement de chaque paquet fournit un résultat partiel. Ces résultats sont ensuite fusionnés, d’une façon ou d’une autre, dans le résultat global du traitement.

Ces paquets peuvent être traités les uns à la suite des autres dans un même thread ; disons qu’il s’agit de la façon classique de faire, ou tout du moins immédiate.

On peut aussi choisir de traiter chaque paquet dans son propre thread. Une fois que tous ces threads ont fini leur travail, on regroupe les résultats dans le thread qui les a lancés. On a besoin alors d’un point de synchronisation, qui peut être géré par une CyclicBarrier.

Cette façon de faire est tellement classique qu’elle est donnée en exemple dans la Javadoc de cette classe (ai-je déjà dit que la lecture de la Javadoc était une saine activité ?).

Les auteurs appellent parallélisation cette façon de diviser une tâche volumineuse en tâches plus petites. Cette technique n’a pourtant rien à voir avec le calcul parallèle SIMD utilisées sur les MasPar et Connection Machine des années 80, d’ailleurs autrement plus performant. En fait la différence entre le calcul parallèle et le traitement en parallèle est réelle ; les techniques d’écriture de code et d’algorithmes n’ont rien à voir, et c’est de traitement en parallèle que nous allons parler ici. Le framework fork / join apporte une solution particulièrement performante et simple à mettre en œuvre pour exécuter des taches en parallèle. Il fonctionne avec deux classes de base.

  • La classe ForkJoinTask modélise une tâche unique. Cette tâche est envoyée à une réserve de threads, qui va gérer elle-même son exécution dans un thread de cette réserve. Une tâche élémentaire peut générer elle-même d’autres tâches du même type, envoyées à la même réserve.
  • la classe ForkJoinPool : gère la réserve de threads à laquelle les tâches sont soumises. Cette réserve reçoit des tâches, et s’occupe elle-même de choisir quel thread va exécuter quelle tâche. Cette classe est conçue de telle manière qu’un très grand nombre de petites tâches élémentaires peut être traité dans un nombre restreint de threads.

La Javadoc nous met en garde sur la façon d’écrire nos tâches élémentaires :

  • elles ne doivent pas comporter d’éléments de synchronisation, de façon à ne pas se bloquer les unes les autres ;
  • elles ne doivent pas partager de variables, pour des raisons assez évidentes ;
  • elles doivent pouvoir être exécutées rapidement. Ce dernier point est particulièrement subtil à appréhender, comme nous allons le voir.

Dans la mesure où nous sommes dans le domaine des algorithmes itératifs, on peut mesurer le temps en nombre d’itérations. Il faut donc limiter ce nombre. La Javadoc parle de quelques dizaines d’itérations pour une tâche élémentaire ; malheureusement, cela n’a pas grand sens. Il arrive que la Javadoc fasse des raccourcis…

Tâches élémentaires

La classe ForkJoinTask<V> expose deux méthodes principales :

  • join() : retourne le résultat de l’exécution de cette tâche, de type V ;
  • fork() : méthode appelée pour lancer une autre tâche dans la même réserve que celle dans laquelle se trouve la tâche courante.

Une tâche donnée ne doit pas appeler plusieurs fois sa propre méthode fork(). Même si ce point n’est pas vérifié par le framework, il peut s’ensuivre des problèmes de fonctionnement, notamment de visibilité des variables modifiées par cette tâche. Enfin la méthode join() ne rend la main qu’une fois que la tâche est exécutée, et qu’elle a produit un résultat. Cette sémantique est la même que la méthode join() de la classe Thread.

Cette classe est étendue par deux autres classes : RecursiveAction et RecursiveTask<V>. Ces deux classes exposent principalement une méthode compute() qui porte le code exécuté par cette tâche. L’écriture d’une tâche ressemble donc à l’écriture de l’un de nos bons vieux Thread de Java 1 : on étend l’une de ces deux classes, en ne donnant qu’une unique méthode : compute(). Suivant que cette méthode retourne un résultat ou non, on utilisera RecursiveTask<V> ou RecursiveAction, respectivement.

Réserve de threads

La classe ForkJoinPool fait en fait partie du pattern executor. Elle implémente les interfaces Executor et ExecutorService, et s’utilise comme ThreadPoolExecutor, par exemple. La construction d’une instance de cette classe prend en compte le nombre de processeurs (ou de cœurs) disponibles sur la machine, retourné par Runtime.getRuntime().availableProcessors(). Si l’on ne veut pas les utiliser tous, on peut forcer ce nombre, en le passant en paramètre du constructeur. On peut également lui passer notre propre thread factory, si celle utilisée par défaut ne convient pas. Une fois construite une réserve, on peut lui soumettre des tâches de trois façons :

  • par la méthode execute(ForkJoinTask<?> task) : déclenche l’exécution asynchrone de la tâche passée en paramètre ;
  • par la méthode invoque(ForkJoinTask<?> task) : déclenche l’exécution synchrone de la tâche passée en paramètre, cet appel ne rend la main que lorsque la tâche est exécutée ;
  • par la méthode submit(ForkJoinTask<?> task) : retourne immédiatement un objet de type Future<V>.

On retrouve donc toute la sémantique de lancement de threads définie dans le package java.util.concurrent.

Exemple d’utilisation

L’exemple le plus fréquent que l’on trouve sur le net est celui de la Javadoc, et consiste à calculer des nombres de Fibonacci. Ce calcul est récursif, son coût en calcul est exponentiel, son arrêt est bien maîtrisé. Il se prête donc vraiment bien à l’utilisation d’une RecursiveTask<Long>. En plus il est cité en numéro 2 dans la liste des algorithmes récursifs sur Wikipedia, donc on n’a pas besoin de développer des trésors d’imagination pour sortir cet exemple.

Le seul défaut que je lui trouve, c’est qu’en plus de 15 ans d’écriture de code dans différents langages, je n’ai jamais eu à utiliser ces nombres de Fibonacci, ce qui me fait franchement douter de la réelle utilité de cet exemple. De plus, le fait de prendre un algorithme récursif comme exemple, masque l’une des principales difficultés de l’utilisation du framework fork / join, qui est le découpage d’un calcul en tâches que l’on peut mener en parallèle.

Nous allons prendre le calcul matriciel comme exemple. En général, lorsque l’on a de gros calculs de ce type à faire, on utilise des librairies existantes telles que IMSL ou Matlab. La librairie IMSL existe depuis 40 ans, et le code de cet article n’a certainement pas la prétention de faire mieux qu’elle. Cela dit, les calculs sur les matrices sont itératifs sans être récursifs, et donc sont, de mon point de vue un meilleur exemple de découpage en tâches indépendantes que les nombres de Fibonacci.

Partons d’une classe Matrix simple.

public class Matrix {

	// la taille de notre matrice
	private int height, width ;

	// le tableau de son contenu
	private double [][] data ;

	// différents constructeurs
	public Matrix (int height, int width) { ... }
	public Matrix(double[][] data) { ... }
}

On va alléger un peu le code. Je suis persuadé, précieux lecteur, que tu es capable de brillamment intuiter la relation entre height, paramètre du constructeur, et height, champ de la classe, et que tu sais, sans les mains et les yeux fermés, créer un tableau bidimensionnel. Si vraiment ce n’est pas le cas, j’ai écrit une page de cours qui détaille ce point.

Initialisation d’une matrice

Écriture de l’algorithme

La première opération que je te propose de faire, cher et précieux lecteur, est de placer des valeurs dans notre matrice. Commençons simplement, par initialiser une matrice avec des valeurs aléatoires. Une première façon d’écrire ce code est la suivante.

// nouveauté de Java 7 : permet d'avoir une variable aléatoire par thread
private static Random rand = ThreadLocalRandom.current() ;

public static Matrix randomMatrix(int height, int width) {

	Matrix m = new Matrix(height, width) ;
	for (int i = 0 ; i < height ; i++) {
		for (int j = 0 ; j < width ; j++) {
			m.data[i][j] = rand.nextDouble() ;
		}
	}

	return m ;
}

L’exécution de ce code se déroule dans un thread unique, on n’utilise pas le framework fork / join pour le moment. L’exécution sur la machine de votre serviteur, pour une matrice 20 000 x 1 000 (soit vingt millions de tirages aléatoires) prend environ 850ms. Tu en déduiras certainement, précieux lecteur, que si ma machine était un étalon elle ne serait probablement pas en situation de gagner l’Arc de Triomphe. Mais je puis t’assurer, que si Java le soir me rend riche, je tâcherai d’y remédier (pas à gagner l’Arc de triomphe).

Comment exécuter ce code en parallèle ? La première idée qui traverse l’esprit est de confier le calcul sur des sous-matrices à chaque thread. C’est là que le pattern fork / join entre en scène.

Le jeu va consister en la création d’une tâche élémentaire, capable d’initialiser une sous-matrice de cette matrice. L’initialisation de cette matrice va être découpée en une multitude de ces tâches élémentaires, chacune traitée par le framework. La taille de chaque tâche, et donc le nombre de ces tâches, est un subtil équilibre à trouver entre le temps que passe le framework à gérer ces tâches, et le temps de traitement de chacune de ces tâches.

Si le nombre est trop grand (et donc chaque tâche trop petite), alors il n’y aura aucune accélération globale du processus. Si à l’inverse chaque tâche est trop importante, notre processus sera accéléré, mais manquera de fluidité.

Il semble que le framework fork / join préconise que le découpage en tâches élémentaires se fasse de façon récursive. Dans le cas du calcul matriciel, on aurait pu le faire de façon simplement itérative. Écrivons donc une classe MatrixRandomValueTask, que l’on va confier au framework.

L’idée de cette façon de faire consiste à découper notre matrice en quatre. Ce découpage est juste un jeu sur les indices, et donc non consommateur de ressources. Le schéma de découpage est le schéma classique suivant. Examinons le code.

public class MatrixRandomValueTask extends RecursiveTask<Boolean> {

	private static ThreadLocalRandom rand = ThreadLocalRandom.current() ;

	// la sous-matrice sur laquelle cette tâche opère
	private Matrix m ;
	private int i0, iF, j0, jF ;

	// constructeurs
	public MatrixRandomValueTask (Matrix m) { ... }
	public MatrixRandomValueTask (Matrix m, int i0, int iF, int j0, int jF) { ... }

	// méthode imposée par RecursiveTask
	public Boolean compute() {

		// la méthode shouldSplit() décide si la sous-matrice courante doit
		// encore être divisée ou pas
		if (shouldSplit()) {

			// ces méthodes réalisent la division, plusieurs stratégies
			// sont possibles
			int iLim = subHeight(m, i0) ;
			int jLim = subWidth(m, j0) ;

			// création de quatre sous-tâches, une par sous-matrice
			MatrixRandomValueTask subTask11 =
				new MatrixRandomValueTask(m, i0, iLim, j0, jLim) ;
			MatrixRandomValueTask subTask21 =
				new MatrixRandomValueTask(m, iLim, iF, j0, jLim) ;
			MatrixRandomValueTask subTask12 =
				new MatrixRandomValueTask(m, i0, iLim, jLim, jF) ;
			MatrixRandomValueTask subTask22 =
				new MatrixRandomValueTask(m, iLim, iF, jLim, jF) ;

			// lancement de chaque tâche
			subTask21.fork() ;
			subTask12.fork() ;
			subTask22.fork() ;

			// exécution du calcul pour cette tâche
			subTask11.process() ;

		} else {

			// traitement de la sous-matrice dans laquelle on se trouve
			process() ;

		}

		return true ;
	}

	public void process() {

		for (int i = i0 ; i < iF ; i++) {
			for (int j = j0 ; j < jF ; j++) {
				m.line(i)[j] = rand.nextDouble() ;
			}
		}
	}
}

Cette classe n’est pas tout à fait complète. En fait, avant de lancer chaque sous-tâche, il faut vérifier que iF est bien supérieur à iLim, et jF à jLim. Si ce n’est pas le cas, il est inutile de créer et lancer les tâches correspondantes.

La méthode shouldSplit() est stratégique : c’est elle qui décide si la sous-matrice dans laquelle on se trouve a une taille qui correspond à ce qui peut être traité dans une tâche élémentaire. En fonction du type de traitement réalisé (ici il s’agit du tirage d’un nombre aléatoire), ce nombre va évidemment varier.

Les méthodes de division de notre matrice sont également importantes. Deux stratégies sont possibles.

  • Diviser la matrice en quatre sous-matrices de mêmes tailles (à une ligne ou une colonne près). Ce n’est pas une bonne méthode, dans la mesure où l’on connait la taille idéale d’une sous-matrice (on l’a utilisée pour écrire la méthode shouldSplit() !). Autant utiliser cette information immédiatement.
  • Découper notre matrice en tranches successives, suivant les lignes et les colonnes, de façon à produire une sous-matrice de la bonne taille, qui va être traitée tout de suite, et trois sous-matrices qui seront découpées à nouveau.

Enfin, on remarque que sur nos quatre sous-tâches, trois sont envoyées au framework pour traitement, et une dernière est exécutée immédiatement. Cette dernière porte une sous-matrice qui a la bonne taille par construction, on peut donc dérouler notre traitement dessus directement. Pour terminer cette partie, écrivons la méthode d’invocation de la classe Matrix.

public classe Matrix {

	public static Matrix randomMatrix(ForkJoinPool pool, int height, int width) {

		Matrix m = new Matrix(height, width) ;
		MatrixRandomValueTask task = new MatrixRandomValueTask(m) ;
		pool.invoke(task) ;
		return m ;
	}
}

Pourquoi passer la réserve de threads en paramètre plutôt que de l’initialiser en tant que champ, statique ou non, de la classe Matrix ? Simplement parce que chaque calcul et chaîne de calcul doivent être exécutés dans la même réserve, pour des raisons que nous verrons plus loin.

Paramétrage

L’étape de paramétrage est indispensable, et si elle n’est pas menée à bien comme il le faut, l’optimisation que nous avons écrite peut être complètement anéantie.

Régler cet algorithme consiste en fait à écrire la méthode shouldSplit() et répondre à la question : à quelle taille de matrice décide-t-on de faire le calcul ?

Rappelons que sans l’utilisation du framework fork / join, le temps de traitement est d’environ 800ms. On travaille sur une machine à 4 cœurs, on s’attend donc à descendre à 200ms.

Voici le résultat d’une petite expérimentation, qui fait varier le nombre de threads utilisés par la réserve (de 1 à 4), et la taille de la matrice élémentaire. Les temps d’exécution sont exprimés en millisecondes.

# threads1234
1 000 x 1 000150220250470
500 x 500140150170200
200 x 200140140140150
100 x 100140130140140
50 x 50125125125140
20 x 20125125125130

On constate que l’utilisation de ce framework permet de gagner bien plus qu’un facteur 4 sur l’exécution dans un unique thread : on gagne quasiment un facteur 6, à condition de ne pas fixer des paramètres trop délirants.

Références

Les documentations utiles sont avant tout celles publiées par le groupe de travail du JSR 266 : http://g.oswego.edu/dl/concurrency-interest/.

La Javadoc du package java.util.concurrenthttp://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html, et notamment les classes ForkJoinPool, ForkJoinTask<V>, RecursiveAction et RecursiveTask<V>.