Java 7 : fork / join et parallel arrays – 1

Le pattern fork / join, nouveauté de Java 7 et création de Doug Lea a déjà fait l’objet de quatre articles sur ce blog, et tu pourrais penser, précieux et rare lecteur, que j’en ai assez dit sur le sujet, qu’il est temps de passer à autre chose.

En fait il n’en est rien, car, bien que déjà très complète, l’API Fork / join telle que livrée par Doug Lea pour Java 7 ne comporte pas tous les éléments du package JSR-166, notamment un morceau très intéressant pour nous : les parallel arrays.

Dans nos précédents articles, nous prenions comme exemple le calcul intensif sur des tableaux. La parallélisation semble simple : il suffit de découper le tableau en morceaux, de les traiter, puis de regrouper les résultats. Simple mais pas tant que ça, car nous avions vu que la stratégie de découpage a une influence sur la performance globale du processus et que des problèmes de synchronisation pouvaient se poser.

Ce découpage ressemble au pattern map / reduce, dont nous allons reparler plus loin. En fait, nous avions mis en évidence que les problèmes de synchronisation pouvait surgir dans cette approche, et que les résultats des calculs, de ce fait, devenaient fragiles.

Et nous n’avons pas encore parlé de l’influence de la parallélisation sur le résultat d’un calcul sur un tableau, qui, dans certains cas, peut changer du tout au tout ! Mais ça, ça sera pour une autre fois.

Je te propose, cher lecteur, un petit voyage au cœur des parallel arrays, qui résolvent ces problèmes.

Un peu d’installation

Travailler avec les parallel arrays, c’est comme tout, ça se mérite. Et le test d’entrée, précieux lecteur, c’est la récupération des JARs et leur installation.

Les parallel arrays sont construits sur l’API fork / join. Sauf que les classes en dépendance du framework parralel arrays ne sont pas celles de Java 7, mais celles qui se trouvent dans le package jsr166y. On peut trouver la chose bizarre, cela présente tout de même un intérêt : bien que construite sur le framework fork / join, l’API peut être  utilisée en Java 6. L’inconvénient, c’est que si l’on travaille déjà en Java 7, on se retrouve avec deux classes ForkJoinPool, l’une dans le package java.util.concurrent, et l’autre dans jsr166y. On avait déjà connu cette situation avec les classes du java.util.concurrent, que l’on pouvait charger dans une JVM 4 à l’aide du package EDU.oswego. Et dans ces temps anciens, nous avions plutôt bien vécu la chose, donc pourquoi cela serait-il différent avec ce nouveau framework ?

On a donc besoin de deux JARs en dépendance de notre projet :

  • celui qui contient la bonne (ou mauvaise, suivant le point de vue…) version de ForkJoinPool, dans le JAR jsr166y.jar ;
  • celui qui contient les classes du framework proprement dit, dont ParallelArray, dans le JAR extra166y.jar.

Ces deux librairies sont téléchargeables sur la page Concurrency JSR-166 Interest Site.

Trois types de parallel array

L’API supporte n’importe quel type de tableau, au sens ou ces tableaux peuvent contenir n’importe quel type d’objet. Dans le cas des nombres, et afin d’éviter la surcharge de calcul induite par l’auto-boxing, elle propose deux classes supplémentaires : ParallelLongArray, pour tous les types entiers, et ParallelDoubleArray pour les deux types flottants. Pour les autres, comme nous le verront, certains raffinements sont possibles si les objets de ces tableaux sont comparables ou pas. Si ce n’est pas le cas, on peut aussi passer un comparateur. Les choses sont vraiment bien faites.

L’API expose une série de méthodes (importantes !), dont la première permet de créer un tableau parallèle.

On a quatre méthodes de construction et initialisation (en fait cinq, car l’une d’elle prend deux jeux de paramètres différents).

  • create(...) : méthode à laquelle on passe la taille du tableau, le type de ses éléments. Le tableau parallèle créé a la taille passée en paramètre.
  • createUsingHandoff(...) : méthode à laquelle on passe un tableau, qui sera utilisé comme tableau initial. La documentation précise que ce tableau pourra être modifié par l’API, il ne faut donc plus l’utiliser.
  • createFromCopy(...) : méthode à laquelle on passe un tableau, dont une copie sera utilisé pour initialiser le tableau parallèle. On peut également passer un entier en paramètre, qui indique le nombre d’éléments de ce tableau à prendre en compte.
  • createEmpty(...) : méthode à laquelle on passe un entier qui fixe la taille maximale du tableau parallèle. La différence avec la méthode create(...) est que la taille initiale du tableau créé est égale à 0. On peut l’agrandir dynamiquement, jusqu’à la limite fixée.

Toutes ces méthodes prennent de plus cette fameuse instance de ForkJoinPool, qui gère le traitement en parallèle. La documentation insiste bien sur le fait qu’un tableau parallèle attaché à une telle instance ne peut pas être attaché à une autre instance. En revanche, un même pool peut se voir attacher plusieurs tableaux parallèles. C’est même plutôt conseillé, puisque l’idée d’un pool, c’est bien de contrôler la répartition des calculs entre les cœurs disponibles.

Opérations sur les parallel arrays

Une fois créé, notre tableau devient disponible pour toutes sortes de traitements. Ces traitements sont définis à l’aide d’opérations de différents types, que l’on peut chaîner. On applique ces traitements à la manière de la programmation fonctionnelle, comme nous allons le voir.

L’API définit une classe de base, Ops, qui englobe toutes les opérations que l’on peut effectuer sur les tableaux parallèles, sous forme de classes membre statiques.

  • Ops.Generator : permet de générer des valeurs dans un tableau parallèle ;
  • Ops.Predicate : permet d’appliquer un filtrage sur un tableau parallèle, et de ne retourner que les valeurs filtrées ;
  • Ops.Reducer : permet de calculer une valeur par agrégation des valeurs d’un tableau parallèle ;
  • opérations de mapping : permet d’appliquer une transformation aux valeurs d’un tableau parallèle, valeur par valeur.

Deux interfaces sont définies, qui ne sont pas ou peu utilisées dans l’API :

  • Ops.Procedure ;
  • Ops.Action.

L’interface Ops.Procedure est documentée de façon minimale, elle définit un traitement qui ne retourne rien, donc utile pour enregistrer une information de journalisation ou de débugage par exemple. Une instance de cette interface est passée à la méthode apply() définie sur la classe ParallelArray.

L’interface Ops.Action n’est pas plus documentée, et n’est appelée nulle part. Elle définit une opération qui ne prend pas de paramètre et ne retourne rien. Peut-être est-elle là pour une utilisation future.

Une opération est donc nécessairement une instance de l’une de ces interfaces. On peut la créer sous forme anonyme, c’est ce qui semble le plus adapté. Cette instance est ensuite passée à une des méthodes de notre tableau parallèle. Suivant le type d’opération, différents types de méthodes existent.

Deux cas se présentent alors.

  • L’opération modifie le tableau « en place », ce qui est le cas du générateur. Dans ce cas l’opération est immédiatement appliquée.
  • L’opération génère un autre tableau, ce qui est le cas le plus fréquent. Dans ce cas une instance particulière d’un tableau parallèle est retournée.

Faisons deux remarques pour ce second type d’opérations.

Tout d’abord, il existe plusieurs interfaces pour modéliser ces instances particulières. Par exemple, l’application d’un prédicat retourne une instance de ParallelArrayWithFilter. Ces interfaces n’étendent pas ParrallelArray, mais les méthodes qu’elles définissent permettent tout de même de chaîner les traitements.

Enfin, dans ce second cas, le calcul n’est pas effectué au moment où cette instance est retournée, mais plutôt sur appel de la méthode all() de cette instance. Cette méthode est présente dans toutes les interfaces retournées.

On peut donc définir notre chaîne de traitement complète, composée comme nous allons le voir, de mappings, de filtres, et autres reducers, et lancer le calcul une fois cette définition écrite, par un appel à all(). Notons qu’il faut chercher à réduire les opérations chaînées, pour des raisons évidentes de performances. Ce point est précisé dans la documentation.

Le nombre total d’opérations est très important, l’objet de cet article n’est pas d’en faire le catalogue (ce qui serait très ennuyeux !), mais plutôt de voir comment les choses s’organisent.

Opération de génération

L’opération de génération est la plus simple. Elle permet d’initialiser les valeurs portées par le tableau parallèle considéré.

// on utilise une variable aléatoire de seed 1L
private static Random rand = new Random(1L) ;

// création d'un pool de threads de type fork / join
// il s'agit ici de la classe jsr166y.ForkJoinPool
// le taux de parallèlisme est celui par défaut
ForkJoinPool pool = new ForkJoinPool() ;

// création d'un tableau parallèle de long
ParallelLongArray a0 = ParallelLongArray.create(10, pool) ;
a0.replaceWithGeneratedValue(new LongGenerator() {

	@Override
	public long op() {
		return rand.nextInt(100) ;
	}
}) ;

System.out.println("a0 = " + a0) ;

Dans l’exemple qui précède, la méthode op() de notre opération va être appelée par le framework pour chaque case de notre tableau, qui prendra la valeur retournée par cette méthode. Ici une valeur aléatoire comprise entre 0 et 99.

On ne peut pas prévoir dans quel thread cette méthode sera appelée, et si l’on utilise des tableaux parallèles, c’est bien pour que ces threads soient différents ! Donc, par principe, cette méthode op() ne doit pas faire d’opération qui ne soit pas thread safe, ou d’opération de synchronisation qui risquerait de bloquer tout le processus.

On remarque que cette API est aimablement faite, puisque le résultat s’affiche plaisamment sous la forme :

a0 = [88, 13, 6, 78, 48, 85, 47, 54, 4, 34]

On remarque également que si l’on exécute cette application plusieurs fois, les valeurs du tableau se retrouvent mélangées. Cela nous montre qu’elles ont été initialisées dans un ordre aléatoire, ce qui n’aurait pas été le cas si elles l’avaient été dans un thread unique. D’ailleurs, si l’on fixe le taux de parallélisation de notre pool à 1, on se rend compte que l’ordre dans lequel elles sont générées est toujours le même.

Limitation de la portée des opérations

La méthode withBounds(int firstIndex, int upperBound) permet de limiter la portée des opérations définies à l’intervalle dont les bornes sont passées en paramètres. L’utilisation de cette méthode peut être dangereuse, car certaines opérations peuvent voir leur résultat changer si elles ne sont pas appliquées sur la totalité d’un tableau. .

Prédicat

Les prédicats nous permettent d’appliquer un filtrage sur un tableau parallèle.

Un prédicat est une instance de Ops.Predicate, dont l’unique méthode retourne un booléen et accepte le type de notre tableau parallèle. Le filtrage par un prédicat n’est pas une opération immédiate. Il est effectué sur appel à la méthode all() de l’objet retourné. Dans l’exemple suivant, on sélectionne simplement les valeurs supérieures à 50.

ParallelLongArrayWithFilter a0F = a0.withFilter(new Ops.LongPredicate() {

	@Override
	public boolean op(long l1) {
		return l1 > 50 ;
	}
}) ;

System.out.println("a0 = " + a0) ;
System.out.println("a0F = " + a0F.all()) ;

Le résultat est le suivant.

a0 = [88, 13, 6, 78, 48, 85, 47, 54, 4, 34]
a0F = [88, 78, 85, 54]

Opération de réduction

La réduction dans ce contexte est à rapprocher de la notion d’agrégation en SQL. Il s’agit de calculer une valeur, en général numérique, à partir d’un nombre plus important de valeurs. Typiquement, une réduction consiste à calculer une moyenne, une somme, une norme, un max ou un min, etc…

Une opération de réduction doit être associative, ce qui signifie que l’on doit avoir la propriété suivante :

reduction(a, reduction(b, c)) = reduction(reduction(a, b), c) ;

C’est le cas de la somme.

reduction(a, b) = a + b ; // on a bien (a + (b + c)) = ((a + b) + c)

Ce n’est pas le cas du carré de la somme.

reduction(a, b) = (a + b)^2 ; // #fail

L’API est ainsi faite qu’une réduction retourne le même type que le type porté par le tableau sur lequel on calcule la réduction.

Déterminons par exemple la somme des valeurs de notre tableau.

long reducedValue = a0.reduce(new Ops.LongReducer() {

	@Override
	public long op(long l1, long l2) {
		return l1 + l2 ;
	}
}, 0L) ;

La méthode reduce() prend en paramètre une instance de Ops.LongReducer (car nous utilisons la classe ParallelLongArray pour nos exemples), et un paramètre supplémentaire, qui est retourné dans le cas où le tableau parallèle sur lequel on travaille a une longueur nulle.

La valeur réduite est immédiatement calculée. Dans notre exemple, elle est reproductible, et a pour valeur 457.

Le type retourné par le filtrage expose les bonnes méthodes pour chaîner filtrage et réduction, comme dans l’exemple suivant.

// définition d'un filtre et d'un reducer
Ops.LongPredicate predicate = new Ops.LongPredicate() { ... } ;
Ops.LongReducer reducer = new Ops.LongReducer() { ... } ;

// application du filtrage puis de la réduction
long reducedValue = a0.withFilter(predicate).reduce(reducer, 0L) ;

Notons que dans le cas de la somme, la classe ParralelArray expose une méthode sum()qui fait déjà le travail. L’exemple de la somme a le mérite d’être simple, dans la vie réelle, ce n’est pas comme ça que l’on ferait !

Conclusion (provisoire)

Voila qui termine la première partie de ce long article sur les parallel arrays. Cet article se terminera par la présentation des opérations de mapping, des opérations d’accumulation, de nombreux exemples et quelques mots sur la performance sur un cas réel.

Référence

La page de la JSR-166 : http://g.oswego.edu/dl/concurrency-interest/
La Javadoc du package extra166y : http://gee.cs.oswego.edu/dl/jsr166/dist/extra166ydocs/