• 6 heures
  • Moyenne

Ce cours est visible gratuitement en ligne.

course.header.alt.is_certifying

J'ai tout compris !

Mis à jour le 30/11/2016

L'interface Queue<E>

Connectez-vous ou inscrivez-vous gratuitement pour bénéficier de toutes les fonctionnalités de ce cours !

Nous allons maintenant voir une nouvelle interface.
Nous verrons que cette dernière s'apparente à une file d'attente dont la gestion est assurée à l'aide de tout un lot de méthodes.

Pas besoin d'une intro à rallonge, plongeons-nous directement dans le vif du sujet !

Généralités

L'interface Queue<E> représente une collection gérée comme une file d'attente. Elle permet donc de prioriser les tâches de traitement des données qu'elle contient. Ces objets utilisent généralement (mais pas obligatoirement) un mode de gestion de type FIFO, ce qui signifie que le premier élément ajouté à la collection sera le premier à en sortir. En fait, lorsque vous ajouterez des éléments dans ce type de collection, ils seront ajoutés à la fin de la collection, tandis que lorsque vous retirerez des éléments, ce seront les premiers insérés qui seront retirés en premier.

Ce type de collection peut également prendre en compte la notion de taille disponible, et par exemple générer des exceptions en cas de dépassement.

Avant de continuer à vous présenter cette interface, voici un diagramme de classes :

Image utilisateur

Vous pouvez voir qu'il y a beaucoup d'objets différents dans ce type de collection... Nous allons en passer un certain nombre en revue mais, avant cela, regardons ce que nous offre l'interface Queue<E>.

Méthode

Définition

add(E e)

Insère l'élément dans la file si c'est possible. Dans le cas contraire, une exception de type IllegalStateException sera levée.

element()

Retourne le premier élément de la collection, sans le supprimer. S'il n'y a plus d'éléments lors de l'appel à cette méthode, celle-ci retournera une exception de type NoSuchElementException.

offer(E e)

Idem que la méthode add(E e) mais au lieu de lever une exception, la méthode retournera false.

peek()

Idem que la méthode element() mais au lieu de lever une exception, la méthode retournera null.

remove()

Retourne le premier élément de la collection, en le supprimant cette fois. S'il n'y a plus d'éléments lors de l'appel à cette méthode, celle-ci retournera une exception de type NoSuchElementException.

poll()

Idem que la méthode remove() mais au lieu de lever une exception, la méthode retournera null.

Voilà donc pour les fonctionnalités de l’interface Queue<E>. Mais comme vous l’avez remarqué sur le diagramme, la plupart des objets utilisables par cette interface implémentent d’autres interfaces, ou bien héritent de la classe AbstractQueue<E>.

Je vous propose maintenant de vous présenter les différentes implémentations, en commençant par celles qui ont le moins d'interfaces...

PriorityQueue<E> et ConcurrentLinkedQueue<E>

Ces deux classes héritent de la classe AbstractQueue<E> mais n'implémentent pas d'autres interfaces que Queue<E>.

L'objet PriorityQueue<E>

Une queue de priorité récupère les éléments triés, après insertion dans un ordre quelconque. Cependant ceci n'est pas le cas lorsque vous parcourez ce type de collection. En fait, la méthode remove() va vous fournir le plus petit élément de la collection mais ce n'est pas forcément celui qui est en première position.

Cet objet fonctionne avec ce qu'on appelle un "tas". Il s'agit d'une arborescence binaire dans laquelle les méthodes remove() et add() place les éléments les plus petits autour de la racine, sans vraiment faire le tri complet de son contenu.

Ce genre d'objet est tout indiqué pour planifier des tâches, gérer des priorités etc.

Voici un code qui vous montre l'utilisation de l'objet PriorityQueue<E> :

public class Main {
   public static void main(String[] args) {
      Queue<Integer> q = new PriorityQueue<Integer>();
      //On tente de retirer un élément inexistant
      Integer nb = q.poll();
      //ce qui nous retourne "null"
      System.out.println(nb);
      
      Random r = new Random();
      
      //Nous allons ajouter des entiers de façon aléatoire
      for(int i=0; i < 10; i++){
         q.offer(r.nextInt(20));
      }
      System.out.println("Affichage du contenu de la queue");
      System.out.println(q);
      
      //Nous allons parcourir notre collection
      //Et mettre son ordre dans une liste
      List<Integer> list = new ArrayList<Integer>();
      Iterator<Integer> it = q.iterator();
      while(it.hasNext())
         list.add(it.next());
      
      //Nous affichons la liste
      System.out.println("Affichage de l'ordre de la queue");
      System.out.println(list);
      
      //On supprime tout le contenu de notre liste
      list.removeAll(q);
      //nous allons maintenant voir comment les objets
      //sont dépilés de notre collection
      for(int i = 0; i < 10; i++)
         list.add(q.remove());
      
      System.out.println("Affichage de l'ordre de dépilage");
      System.out.println(list);
   }
}

Et le résultat :

null
Affichage du contenu de la queue
[1, 1, 2, 1, 16, 18, 8, 19, 10, 16]
Affichage de l'ordre de la queue
[1, 1, 2, 1, 16, 18, 8, 19, 10, 16]
Affichage de l'ordre de dépilage
[1, 1, 1, 2, 8, 10, 16, 16, 18, 19]

Vous pouvez constater que les éléments ne sont pas triés au sein de la collection mais que ceux-ci sont dépilés par ordre croissant.
Vous avez aussi la possibilité de définir un ordre de sortie. Voici un exemple :

//Le premier paramètre défini la taille initiale de la collection
//Le deuxième le comparateur à utiliser, ici par ordre décroissant
Queue<Integer> q = new PriorityQueue<Integer>(10, new Comparator<Integer>(){
   public int compare(Integer int1, Integer int2) {
      return int2.compareTo(int1);
   }
});

Random r = new Random();

for(int i=0; i < 10; i++)
   q.offer(r.nextInt(20));

System.out.println(q);

List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 10; i++)
   list.add(q.remove());

System.out.println("Affichage de l'ordre de dépilage");
System.out.println(list);

Et le résultat :

[17, 10, 13, 6, 9, 6, 11, 1, 4, 0]
Affichage de l'ordre de dépilage
[17, 13, 11, 10, 9, 6, 6, 4, 1, 0]

Que se passe-t-il lorsque nous dépassons la taille initiale ?

Rien du tout, la collection grandit de façon dynamique. :)
Passons maintenant au deuxième objet.

L'objet ConcurrentLinkedQueue<E>

Comme son nom l'indique, cet objet est thread-safe, donc utilisable dans un environnement multithread. Il fonctionne un peu comme l'objet ci-dessus à la différence près que l'élément placé en début de file est celui qui est présent depuis le plus longtemps dans la collection donc, par extension, le dernier élément ajouté sera placé en fin de collection. Par contre, cet objet n'autorise pas les éléments null .

Voici un code d'exemple :

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;


public class Main2 {

   public static void main(String[] args) {
      //ces listes servent uniquement à répertorier les ajouts et suppressions
      List<Integer> listAjout = new ArrayList<Integer>();
      List<Integer> listRetire = new ArrayList<Integer>();
      List<Integer> listAjoutThread = new ArrayList<Integer>();
      
      //On crée notre collection
      ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
      
      //Un thread qui va se charger de retirer et d'ajouter des éléments
      MyThread t = new MyThread(queue, listRetire, listAjoutThread);
      t.start();
      
      //Et nous ajoutons des éléments depuis notre thread principal
      for(int i = 0; i < 10; i++){
         Random rand = new Random();
         Integer integer = rand.nextInt();
         queue.offer(integer);
         listAjout.add(integer);
         try {
            Thread.currentThread().sleep(1000);
         } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
         }
      }
      
      System.out.println(listAjout);
      System.out.println(listAjoutThread);
      System.out.println(listRetire);      
   }
}

class MyThread extends Thread{
   private ConcurrentLinkedQueue<Integer> queue;
   private List<Integer>list, listAjout;
   public MyThread(ConcurrentLinkedQueue<Integer> q, List<Integer> pList, List<Integer> pList2){
      queue = q;
      list = pList;
      listAjout = pList2;
   }
   
   public void run(){
      int nb = 0;
      Random rand = new Random();
      while(true){
         try {
            this.sleep(1000);
            //On ajoute un élément dans la collection
            Integer ajout = rand.nextInt(10);
            queue.offer(ajout);

            //On retire un élément
            Integer integer = queue.poll();
            
            //On stocke les deux actions dans nos listes
            list.add(integer);
            listAjout.add(ajout);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         nb++;
         if(nb > 10)
           break;
      }
   }
   
   public List<Integer> getList(){
      return list;
   }
   
   public List<Integer> getListAjout(){
      return listAjout;
   }
}

Et le résultat :

[302135980, -715211292, -465293153, 438546225, 1651135923, 774262222, 84799215, -117347279, 204998898, -1088590448]
[3, 7, 6, 7, 3, 3, 4, 8, 6, 7]
[302135980, -715211292, 3, -465293153, 7, 438546225, 6, 7, 1651135923, 774262222]

Ce code parle de lui-même. Les éléments ajoutés en premier sont retirés les premiers, quel que soit le thread qui les insère...

L'interface BlockingQueue<E>

Cette interface rajoute aussi des contraintes à l'interface Queue<E>  que nous avons vu précédemment. Ce type de collection n'accepte pas les valeurs null  et lève une exception si vous tentez d'en insérer une mais ce qui caractérise cette interface est une notion de blocage pour les actions ne pouvant pas être faites immédiatement mais qui pourraient l'être dans un futur plus ou moins proche. Ce blocage intervient sur les méthodes usuelles de nos collections (ajout, suppression, consultation) lorsque vous essayez d'ajouter des éléments dans une collection qui n'en accepte plus.

Tu veux dire que ce genre de collection ont une taille définie ?

En fait, c'est vous qui avez la possibilité de définir une taille et, si vous ajoutez un élément alors que la collection a atteint sa taille maximum, c'est à ce moment qu'il peut y avoir une exception, un blocage ou autre en fonction de la méthode utilisée. Voici un tableau qui représente les différents types de rendu en fonction des méthodes employées sur les éléments de la collection :

Type de méthode

Lève une exception

Retourne une valeur

Bloque le thread indéfiniment

Bloque le thread pendant une durée

Méthode d'insertion

add(E elem)

offer(E elem)

put(E elem)

offer(E elem, long timeout, TimeUnit unit)

Méthode de récupération avec suppression

remove()

poll()

take()

poll(long timeout, TimeUnit unit)

Méthode de récupération sans suppression

get()

peek()

De par son mode de fonctionnement, les implémentations de cette interface sont tout indiquées pour gérer des collections dont les objets sont produits (ajoutés) via un thread à part, et sont consultés ou utilisés via un troisième thread. Pour faire simple, ces objets sont très utiles dans un environnement multithread. Parlons maintenant de ces implémentations.

L'objet ArrayBlockingQueue<E>

Cet objet gère donc son contenu via un tableau et l'ordre de ses éléments sous la forme FIFO. L'élément en fin de collection est celui qui fait partie de la collection depuis le plus longtemps, je vous laisse donc en tirer les conclusions qui s'imposent pour l'élément en fin de collection. :-°
Par contre, vu que ces éléments sont censés travailler en mode multithread, l'ordre d'accès n'est pas réellement garanti, dans le cas où un thread insère les éléments et un autre les récupère. De ce fait, il existe un constructeur qui permet de passer un booléen qui force le mode FIFO.
Autre point important : vous devez spécifiez la taille de ce type de collection. De ce fait vous n'aurez plus la possibilité modifier cette dernière et, si vous dépassez la taille limite, vous recevrez une belle exception !

L'objet LinkedBlockingQueue<E>

Cet objet, lui, fonctionne avec une suite de nœuds liés entres eux. Tout comme l'objet ci-dessus, il gère ces éléments en FIFO avec comme premier élément, celui qui y est dans la collection depuis le plus longtemps. Par contre, vous n'êtes pas obligé de spécifier une quantité maximum d'éléments et, si vous optez pour cette solution, la capacité maximum correspondra à la valeur de l'attribut statique de la classe Integer : Integer.MAX_VALUE.
Les nœuds supplémentaires seront donc créés de façon dynamique.

L'objet PriorityBlockingQueue<E>

Cette classe utilise le même ordonnancement que l'objet PriorityQueue<E>  en y ajoutant la notion de blocage entre les threads. Elle utilise donc le même mécanisme pour ordonner ses éléments donc les éléments qu'elle contient doivent être comparables ou nous devons fournir un comparateur à notre instance pour qu'elle sache trier ses valeurs. De plus, cet objet ne prévoit pas de fonctionnement particulier pour les éléments considérés comme égaux.

Par contre, bien que cette implémentation est théoriquement sans limites, il se peut que certaines insertions échouent par manque de mémoire, levant ainsi une exception de type OutOfMemoryError .

Autre point important, l'iterateur retourné par cet objet ne vous permet pas de parcourir les éléments triés, vous aurez une vue anarchique de son contenu. Si vous voulez parcourir les éléments triés, vous n'aurez pas d'autre choix que d'utiliser la conversion de notre collection en tableau et d'utiliser la méthode de tri de l'objet Arrays, comme ceci : Arrays.sort(pq.toArray()).

Voici un code qui vous montre le fonctionnement de ces trois objets :

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;


public class Main7 {

   public static void main(String[] args) {

      //Nous allons tester avec trois implémentations pour commencer
      
      //test de l'implémentation LinkedBlockingQueue
      LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>(10); 
      System.out.println("Exemple avec l'objet LinkedBlockingDeque<T> : ");
      System.out.println("----------------------------------------------");
      doIt(lbq);

      System.out.println("----------------------------------------------");
      
      //test de l'implémentation ArrayBlockingQueue
      ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10); 
      System.out.println("Exemple avec l'objet ArrayBlockingQueue<T> : ");
      System.out.println("----------------------------------------------");
      doIt(abq);
      

      System.out.println("----------------------------------------------");
      
      //test de l'implémentation PriorityBlockingQueue
      PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<Integer>(10); 
      System.out.println("Exemple avec l'objet PriorityBlockingQueue<T> : ");
      System.out.println("----------------------------------------------");
      doIt(pbq);
      
   }   
   
   static void doIt(BlockingQueue<Integer> bq){
    //Création d'une collection n'acceptant que 10 entrées
      BlockingQueue<Integer> lbq = bq; 
      //Nous allons tester la concurrence sur cette collection
      SampleThread t1 = new SampleThread(null, 0, lbq);
      SampleThread t2 = new SampleThread(TimeUnit.MILLISECONDS, 10L, lbq);
      
      //Nous lançons les threads 
      //t1 ne nous rendra pas la main car nous tentons d'ajouter des éléments
      //Alors que la collection sera considérée comme pleine
      t1.start();
      //alors que ce Thread terminera normalement car nous ne bloquons le thread
      //que pendant un certain temps
      t2.start();
      
      //On attend quelques secondes et on lance une suppression des éléments
      try {
         System.out.println("Pause de 10 secondes");
         Thread.sleep(10000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      } 
      
      System.out.println("Suppression de quelques éléments de la collection");
      for(int i = 0; i < 10; i++ )
         lbq.poll();
      //après cette série de suppressions, le premier thread arrive à terminer son travail
      
      //Nous attendons quelques secondes
      try {
         System.out.println("Pause de 3 secondes");
         Thread.sleep(3000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      
      LinkedBlockingQueue<Integer> lbq2 = new LinkedBlockingQueue<Integer>(3);
      //nous tentons maintenant de mettre le contenu  notre première collection
      //dans la seconde, mais ceci lèvera une exception car le contenu de la première est trop volumineux
      //pour la seconde ! 
      System.out.println("Capacité restante : " + lbq.remainingCapacity());
      System.out.println("Capacité restante : " + lbq2.remainingCapacity());
      try {
         lbq.drainTo(lbq2);
      } catch (IllegalStateException e) {
         System.out.println("La queue est pleine !! ");
      }
       
      System.out.println(lbq2);
   }
}

class SampleThread extends Thread{
   private TimeUnit tu;
   private long timeout;
   private BlockingQueue<Integer> lbd;
   
   public <T> SampleThread(TimeUnit tu, long timeout, BlockingQueue<Integer> lbd){
      this.tu = tu;
      this.timeout = timeout;
      this.lbd = lbd;
   }
   
   public void run(){
      Random rand = new Random();
      for(int i =0; i < 10 ; i++){
         
         Integer integer = rand.nextInt(20);
         
         //Si pas de critère de durée
         //On utilise la méthode qui bloque le thread indéfiniment
         if(this.tu == null){
            try {
               if((i%2) == 0){
                  this.lbd.put(integer);
                 
               }
               else
                  this.lbd.put(integer);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         //Sinon, on ne bloque que quelques secondes
         else{            
            try {
               if((i%2) == 0){
                  this.lbd.offer(integer, this.timeout, this.tu);
               }
               else{
                  this.lbd.offer(integer, this.timeout, this.tu);
               }
            } catch (InterruptedException e) {
               e.printStackTrace();
            }         
         }
         try {
            this.sleep(100);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
      
      //On affiche le résultat lors de la fin du traitement
      System.out.println("À la fin du thread " + this.getName() + " : " + lbd);      
   }   
}

Et le résultat :

Exemple avec l'objet LinkedBlockingDeque<T> : 
----------------------------------------------
Pause de 10 secondes
À la fin du thread Thread-1 : [1, 10, 0, 13, 6, 1, 8, 10, 2, 6]
Suppression de quelques éléments de la collection
Pause de 3 secondes
À la fin du thread Thread-0 : [8, 3, 5, 1, 6]
Capacité restante : 5
Capacité restante : 3
La queue est pleine !! 
[8, 3, 5]
----------------------------------------------
Exemple avec l'objet ArrayBlockingQueue<T> : 
----------------------------------------------
Pause de 10 secondes
À la fin du thread Thread-3 : [8, 3, 13, 10, 2, 16, 13, 16, 9, 18]
Suppression de quelques éléments de la collection
Pause de 3 secondes
À la fin du thread Thread-2 : [18, 4, 6, 11, 12]
Capacité restante : 5
Capacité restante : 3
La queue est pleine !! 
[18, 4, 6]
----------------------------------------------
Exemple avec l'objet PriorityBlockingQueue<T> : 
----------------------------------------------
Pause de 10 secondes
À la fin du thread Thread-4 : [0, 0, 1, 2, 1, 3, 1, 5, 4, 6, 3, 15, 10, 14, 3, 13, 12, 11, 17, 12]
À la fin du thread Thread-5 : [0, 0, 1, 2, 1, 3, 1, 5, 4, 6, 3, 15, 10, 14, 3, 13, 12, 11, 17, 12]
Suppression de quelques éléments de la collection
Pause de 3 secondes
Capacité restante : 2147483647
Capacité restante : 3
La queue est pleine !! 
[5, 6, 10]

L'objet SynchronousQueue<E>

Cet objet a un fonctionnement bien particulier car il ne tolère un nouvel ajout que s'il y a une tentative de retrait de ce même élément... De ce fait, vous devriez en tirer une conclusion évidente : cet objet n'a une capacité que d'un seul élément. :waw:

En partant de ce postulat, vous ne devriez donc pas être surpris des autres particularités de cet objet, à savoir :

  • la méthode peek() retourne systématiquement la valeur null ;

  • null n'est pas une valeur autorisée ;

  • la tête de la file correspond à l'élément qui est censé être inséré ;

  • cet objet est destiné aux transferts d'objets entre threads.

Cet objet étant un peu spécial, je vous donne un code d'exemple :

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
public class Main5 {
   public static void main(String[] args) {
      SynchronousQueue<String> sq = new SynchronousQueue<String>();

      Producteur p = new Producteur(sq);
      Consommateur c = new Consommateur(sq);
      p.start();
      c.start();
      
   }
}
/**
 * Cette classe se charge de mettre des données dans la collection
 */
class Producteur extends Thread{
   private SynchronousQueue<String> queue;
   public Producteur(SynchronousQueue<String> q){
      queue = q;
   }
   public void run(){
      Random rand = new Random();
      
      for(int i = 0 ; i < 5; i++){
         String str = "N° " + rand.nextInt(100);
         try {
            queue.put(str);
            System.out.println("Ajout de l'élément " + str + " dans la collection OK");
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

/**
 * Cette classe se charge de récupérer la donnée
 */
class Consommateur extends Thread{
   private SynchronousQueue<String> queue;
   public Consommateur(SynchronousQueue<String> q){
      queue = q;
   }
   public void run(){   
      for(int i = 0 ; i < 5; i++){
         //On met une temporisation pour éviter les tours de boucle à vide
         try {
            this.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         String str = queue.poll();
         System.out.println("Elément " + str + " retiré de la collection");
      }
   }
}

Et le résultat :

Elément N° 37 retiré de la collection
Ajout de l'élément N° 37 dans la collection OK
Elément N° 99 retiré de la collection
Ajout de l'élément N° 99 dans la collection OK
Elément N° 21 retiré de la collection
Ajout de l'élément N° 21 dans la collection OK
Ajout de l'élément N° 33 dans la collection OK
Elément N° 33 retiré de la collection
Ajout de l'élément N° 8 dans la collection OK
Elément N° 8 retiré de la collection

Impressionnant, n'est-ce pas ? Et vous pouvez bien voir que, tant que nous ne tentons pas de récupérer la première valeur, il est impossible d'en insérer une nouvelle !

L'objet DelayQueue<E extends Delayed>

Encore une implémentation particulière qui n'accepte que des objets qui implémentent l'interface Delayed. Cette dernière ne redonne les éléments qu'elle contient uniquement lorsque le délai d'expiration est arrivé à son terme.

Comment savoir si le délai est arrivé à expiration ?

C'est simple, l'interface Delayed demande à redéfinir une méthode, getDelay(TimeUnit tu) et c'est via cette méthode que l'objet de collection arrive à savoir si le délai est arrivé à expiration : lorsque la dite méthode retourne une valeur inférieure ou égale à 0.

Voici un code d'exemple :

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;


public class Main6 {
   public static void main(String[] args) {
      MyObject obj1 = new MyObject("Obj1", 500);
      MyObject obj2 = new MyObject("Obj2", 1000);
      MyObject obj3 = new MyObject("Obj3", 1500);
      MyObject obj4 = new MyObject("Obj4", 2000);
      
      DelayQueue<MyObject> dq = new DelayQueue<MyObject>();
      dq.put(obj1);
      dq.put(obj2);
      dq.put(obj3);
      dq.put(obj4);
      
      //Les éléments sont bien dans la collection mais non utilisables
      System.out.println(dq);
      System.out.println(dq.poll());
      
      for(int i = 0; i < 10; i++){
         //On attend 300 ms
         try {
            Thread.sleep(300);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         MyObject my = dq.poll();
         System.out.println(my);
      }
   }
}

class MyObject implements Delayed{
   private String nom;
   private long origine, delay;
   public MyObject(String name, long d){
      nom  = name;
      origine = System.currentTimeMillis();
      delay = d;
   }


   public long getDelay(TimeUnit unit) {
      return  unit.convert( 
               delay - ( System.currentTimeMillis() - origine ), 
               TimeUnit.MILLISECONDS);
   }
   
   public int compareTo( Delayed delayed ) {
       if( delayed == this ) {
           return 0;
       }
  
       long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );
       return (( d == 0 ) ? 0 : (( d < 0 ) ? -1 : 1 ));
   }


   public String toString() {
      return "MyObject [nom=" + nom + "]";
   }    
}

Et le résultat :

[MyObject [nom=Obj1], MyObject [nom=Obj2], MyObject [nom=Obj3], MyObject [nom=Obj4]]
null
null
MyObject [nom=Obj1]
null
MyObject [nom=Obj2]
MyObject [nom=Obj3]
null
MyObject [nom=Obj4]
null
null
null

Les objets sont donc récupérables mais uniquement lorsque leur délai a expiré. C'est simple mais encore fallait-il le savoir...

L'interface TransferQueue<E>

Comme vous vous en doutez, cette dernière est faite pour travailler en environnement multithread. La nouveauté rajoutée revient à bloquer l'ajout d'un élément jusqu'à ce que celui-ci soit utilisé par un autre thread. Elle ajoute donc quelques méthodes qui permettent cette gestion, voici un tableau récapitulatif :

Méthode

Définition

transfer(E e)

Transfère l'élément à un thread consommateur en attente de cet élément, ou si aucun thread n'attend, attend qu'un thread réclame l'élément.

tryTransfer(E e)

Cette méthode n'attend pas qu'un thread veuille récupérer l'élément et retournera true si un thread attend et false dans le cas contraire.

tryTransfer(E e,long timeout, TimeUnit unit)

Tente d'envoyer l'élément immédiatement à un thread s'il en existe un qui attend. Dans le cas contraire, la méthode ne renverra false que s'il n'y a toujours pas de thread attendant la valeur au bout de la durée spécifiée en paramètre.

Cette interface définit un comportement très proche de l'objet SynchronousQueue<E> ...

Tout à fait. Le mode opératoire est le même mais cette collection est plus souple d'utilisation : vous avez le choix d'attendre un thread consommateur ou non.

L'objet LinkedTransferQueue<E>

Cet objet est donc l'implémentation de l'interface mentionné ci-dessus. Cet objet rajoute deux méthodes utiles :

  • getWaitingConsumerCount() : retourne une estimation des threads attendant des éléments de la collection ;

  • hasWaitingConsumerCount() : retourne true si au moins un thread attend un élément de la collection.

Voici un code d'exemple de cet objet. Attention, ça pique un peu :

import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

public class Main8 {

   public static void main(String[] args) {
      LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
      
      //nous créons un thread de production
      ProducteurThread pt1 = new ProducteurThread(queue, ProducteurThread.TIMEOUT);
      //nous n'allons pas bloquer la récupération dans ce thread
      ConsommateurThread ct1 = new ConsommateurThread(queue, ConsommateurThread.POLL);
      
      pt1.start();
      ct1.start();
      // Ici, la temporisation dans le consommateur
      // est trop grande par rapport à la temporisation
      // dans la méthode d'ajout
      // très peu de contenu devrait être OK, le premier élément en fait...      
      try {
         Thread.sleep(15000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      System.out.println("Premier test terminé ! \n\n");
      
      queue = new LinkedTransferQueue<String>();
      ProducteurThread pt2 = new ProducteurThread(queue, ProducteurThread.DEFAULT);
      ConsommateurThread ct2 = new ConsommateurThread(queue, ConsommateurThread.TAKE);
      
     
      pt2.start();
      ct2.start();
      //pas de soucis ici car la temporisation permet de tout récupérer
      //il n'y a pas de blocage lors de l'ajout de nouveaux éléments dans la collection
      
      try {
         Thread.currentThread().sleep(15000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      System.out.println("Second test terminé ! \n\n");
      
      queue = new LinkedTransferQueue<String>();
      ProducteurThread pt3 = new ProducteurThread(queue, ProducteurThread.BLOCK);
      ConsommateurThread ct3 = new ConsommateurThread(queue, ConsommateurThread.TAKE);
      
      // pas de soucis ici car la temporisation permet de
      // tout récupérer et donc de tout ajouter
      pt3.start();
      ct3.start();
      

      try {
         Thread.currentThread().sleep(15000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      System.out.println("Troisième test terminé ! \n\n");
      

      System.out.println("---------------------------------------------------------");
      System.out.println("Utilisation de la méthode hasWaitingConsumer()");
      System.out.println("---------------------------------------------------------");
      
      queue = new LinkedTransferQueue<String>();      
      ConsommateurThread ct4 = new ConsommateurThread(queue, ConsommateurThread.TAKE);
      ConsommateurThread ct5 = new ConsommateurThread(queue, ConsommateurThread.TAKE);
      ct4.start();
      
      try {
         Thread.currentThread().sleep(5000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      //départ légèrement différé du deuxième Thread consommateur
      ct5.start();
      
      Random rand = new Random();
      System.out.println("Nombre de thread attendant des éléments : " + queue.getWaitingConsumerCount());
      while(queue.hasWaitingConsumer()){
         System.out.println("Nombre de thread attendant des éléments : " + queue.getWaitingConsumerCount());
         String str = "DATA_" + rand.nextInt(100);
         try {
            queue.transfer(str);
            System.out.println("Elément ajouté avec succès : " + str);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         //Cette temporisation permet d'aérer les tours de boucle
         //et ainsi laisser le temps aux deux threads d'aller jusqu'à leur
         //méthode take() pour informer l'objet qu'un thread attend un élément
         try {
            Thread.currentThread().sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }  
      System.out.println("----------------------------------");
      System.out.println("FIN DES TESTS !");
   }
}

class ProducteurThread extends Thread{
   
   private LinkedTransferQueue<String> ltq;
   private int modeDeTransfert = 0;
   public final static int BLOCK=1, TIMEOUT=2, DEFAULT=0;
   
   public ProducteurThread(LinkedTransferQueue<String> q, int mode){
      ltq = q;
      modeDeTransfert = mode;
   }
   
   public void run(){
      Random rand = new Random();
      
      for(int i = 0; i < 5; i++){
         String str = "Random_" + rand.nextInt(20);
                
         long start = System.currentTimeMillis();
        
         //Pour ce mode, on pousse en bloquant une attente d'un autre thread
         if(modeDeTransfert == 1){
            try {
               ltq.transfer(str);
               //on mesure le temps d'ajout
               long end = System.currentTimeMillis();
               System.out.println("Elément " + str + " inséré en " + ((end-start)/1000) + " seconde");
            } catch (InterruptedException e1) {
               e1.printStackTrace();
            }
         }
         //Pour celui-ci, nous mettons une temporisation
         else if(modeDeTransfert == 2){
            try {
              //On récupère le statut du transfert
              boolean bool = ltq.tryTransfer(str, 1000, TimeUnit.MILLISECONDS);
              //on mesure le temps d'ajout
              long end = System.currentTimeMillis();
              System.out.println("Résultat de l'ajout avec temporisation : " + bool);
              if(bool){
                 System.out.println("Elément " + str + " inséré en " + ((end-start)/1000) + " seconde");
              }
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         //Et par défaut, pas de temporisation et pas de blocage
         else{
            ltq.offer(str);
            System.out.println("Elément " + str + " inséré !");
         }
         
         //Une temporisation pour ralentir le tout
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }         
      }      
   }
}

class ConsommateurThread extends Thread{
   private LinkedTransferQueue<String> ltq;
   private int modeRecuperation = 0;
   public static int POLL = 1, TAKE = 2;
   public ConsommateurThread(LinkedTransferQueue<String> q, int mode){
      ltq = q;
      modeRecuperation = mode;
   }
   
   public void run(){
      for(int i = 0; i < 5; i++){
         try {
            //La temporisation est grande
            //pour vous montrer le blocage de la méthode transfert
            //avec temporisation
            Thread.sleep(2500);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         
         //avec ce mode, pas de blocage lors de la récupération
         if(modeRecuperation == 1){
            System.out.print("Récupération de la valeur : " + ltq.poll());
         }
         //ici, un blocage se fait lors de la récupération s'il n'y a pas d'élément
         else{
            try {
               System.out.println("Récupération de la valeur : " + ltq.take());
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }

      System.out.println("---------------------------------------------------------");
      System.out.println("Statut de la collection à la fin du thread " + this.getName());      
      System.out.println(ltq);
      System.out.println("---------------------------------------------------------");
   }
}

Et le résultat :

Résultat de l'ajout avec temporisation : false
Résultat de l'ajout avec temporisation : true
Récupération de la valeur : Random_19Elément Random_19 inséré en 0 seconde
Résultat de l'ajout avec temporisation : false
Récupération de la valeur : nullRésultat de l'ajout avec temporisation : false
Récupération de la valeur : nullRésultat de l'ajout avec temporisation : false
Récupération de la valeur : nullRécupération de la valeur : null---------------------------------------------------------
Statut de la collection à la fin du thread Thread-1
[]
---------------------------------------------------------
Premier test terminé ! 


Elément Random_9 inséré !
Elément Random_12 inséré !
Elément Random_9 inséré !
Récupération de la valeur : Random_9
Elément Random_19 inséré !
Elément Random_11 inséré !
Récupération de la valeur : Random_12
Récupération de la valeur : Random_9
Récupération de la valeur : Random_19
Récupération de la valeur : Random_11
---------------------------------------------------------
Statut de la collection à la fin du thread Thread-3
[]
---------------------------------------------------------
Second test terminé ! 


Elément Random_6 inséré en 2 seconde
Récupération de la valeur : Random_6
Elément Random_11 inséré en 1 seconde
Récupération de la valeur : Random_11
Récupération de la valeur : Random_17
Elément Random_17 inséré en 1 seconde
Elément Random_9 inséré en 1 seconde
Récupération de la valeur : Random_9
Elément Random_15 inséré en 1 seconde
Récupération de la valeur : Random_15
---------------------------------------------------------
Statut de la collection à la fin du thread Thread-5
[]
---------------------------------------------------------
Troisième test terminé ! 


---------------------------------------------------------
Utilisation de la méthode hasWaitingConsumer()
---------------------------------------------------------
Nombre de thread attendant des éléments : 1
Nombre de thread attendant des éléments : 1
Elément ajouté avec succès : DATA_44
Récupération de la valeur : DATA_44
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_2
Elément ajouté avec succès : DATA_2
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_50
Elément ajouté avec succès : DATA_50
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_29
Elément ajouté avec succès : DATA_29
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_98
Elément ajouté avec succès : DATA_98
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_21
Elément ajouté avec succès : DATA_21
Nombre de thread attendant des éléments : 2
Elément ajouté avec succès : DATA_5
Récupération de la valeur : DATA_5
Nombre de thread attendant des éléments : 2
Récupération de la valeur : DATA_56
---------------------------------------------------------
Statut de la collection à la fin du thread Thread-6
[]
---------------------------------------------------------
Elément ajouté avec succès : DATA_56
Nombre de thread attendant des éléments : 1
Elément ajouté avec succès : DATA_23
Récupération de la valeur : DATA_23
Nombre de thread attendant des éléments : 1
Elément ajouté avec succès : DATA_75
Récupération de la valeur : DATA_75
---------------------------------------------------------
Statut de la collection à la fin du thread Thread-7
[]
---------------------------------------------------------
----------------------------------
FIN DES TESTS !

Le premier test utilise la temporisation lors de l'insertion dans l'objet. Hors le thread qui est censé récupérer ces valeurs est décalé dans le temps donc très peu de valeurs sont donc mises dans la collection.

Le deuxième test permet de voir que nous pouvons ajouter des données dans la collection sans attendre qu'un thread ne demande un élément, c'est le test le plus simple car il n'y a pas de verrou.

Le troisième reprend l'idée du premier test mais sans temporisation lors de l'ajout de données, du coup, pour ce test, tous les éléments sont insérés et tous sont récupérés.

Le dernier test, lui, met en évidence la façon d'utiliser l'attente de récupération. Ayant mis des temporisation dans les threads consommateurs, il faut mettre une temporisation dans la boucle car c'est l'invocation de la méthode take() qui informe l'objet qu'un thread attend un élément.
Ici, j'ai volontairement fait démarrer les deux thread consommateurs en différé pour que vous puissiez bien voir le nombre de threads demandeur à chaque tour de boucle.

Voilà, vous savez tout !

L'interface Deque<E>

Le terme "deque" est une abréviation de "double ended queue". La particularité principale de cette interface est qu'elle permet d'insérer et de retirer des éléments dans les deux sens de la pile. De ce fait, il existe des méthodes qui permettent d'ajouter et de retirer des éléments aux deux extrémités. Voici un récapitulatif de ces méthodes :

Méthode

Définition

addFirst(E e)

Insère l'élément au début la file. Si l'insertion échoue, une exception de type IllegalStateException sera levée.

addLast(E e)

Insère l'élément à la fin la file. Si l'insertion échoue, une exception de type IllegalStateException sera levée.

offerFirst(E e)

Idem que la méthode addFirst(E e) mais au lieu de lever une exception, la méthode retournera false.

offerLast(E e)

Idem que la méthode addLast(E e) mais au lieu de lever une exception, la méthode retournera false.

removeFirst()

Retourne le premier élément de la collection, en le supprimant. S'il n'y a plus d'éléments lors de l'appel à cette méthode, celle-ci retournera une exception de type NoSuchElementException.

removeLast()

Retourne le dernier élément de la collection, en le supprimant. S'il n'y a plus d'éléments lors de l'appel à cette méthode, celle-ci retournera une exception de type NoSuchElementException.

pollFirst()

Idem que la méthode removeFirst() mais au lieu de lever une exception, la méthode retournera null.

pollLast()

Idem que la méthode removeLast() mais au lieu de lever une exception, la méthode retournera null.

getFirst()

Retourne le premier élément de la file sans le supprimer. Si aucun élément n'est trouvé, une exception de type NoSuchElementException sera levée.

getLast()

Retourne le dernier élément de la file sans le supprimer. Si aucun élément n'est trouvé, une exception de type NoSuchElementException sera levée.

peekFirst()

Idem que la méthode getFirst() mais au lieu de lever une exception, la méthode retournera null.

peekLast()

Idem que la méthode getLast() mais au lieu de lever une exception, la méthode retournera null.

removeFirstOccurrence(Object o)

Supprime la première occurence de l'élément passé en paramètre. Cette méthode retourne un boolean qui sera à true si la file a subi une modification.

removeLastOccurrence(Object o)

Idem que la méthode ci-dessus mais pour le dernier élément cette fois.

descendingIterator()

Renvoie un itérateur parcourant la file dans le sens inverse de son sens naturel.

Vous avez pu voir que seulement deux objets concrets implémentent directement cette interface, je vous propose maintenant de les passer en revue.

L'objet ArrayDeque<E>

Cet objet représente un tableau redimensionnable, sans restrictions de capacité mais non thread-safe. Les valeurs null y sont proscrites. Les actions faites sur ce type d'objets sont relativement constantes, dans le sens où il n'y a pas de ralentissement notable même si la collection est importante.

Voici un petit code d'exemple :

Deque<String> deque = new ArrayDeque<String>();
deque.offerLast("tete");
deque.offerFirst("toto");
deque.offer("titi");
deque.offerFirst("tata");
System.out.println(deque);

Boolean bool = deque.removeFirstOccurrence("tyty");
System.out.println(bool);
bool = deque.removeFirstOccurrence("tyty");
System.out.println(deque);

System.out.println("\n suppression du dernier élément 'tata' : ");
deque.offerLast("tata");
System.out.println(deque);
bool = deque.removeLastOccurrence("tata");
System.out.println(bool);
System.out.println(deque);

System.out.println("\n Parcours de la file : ");
for(String str : deque){
   System.out.println(str);
}

System.out.println("\n Parcours de la file dans le sens inverse : ");
for(Iterator<String> it = deque.descendingIterator(); it.hasNext();)
   System.out.println(it.next());

Et le résultat :

[tata, toto, tete, titi]
false
[tata, toto, tete, titi]

 suppression du dernier élément 'tata' : 
[tata, toto, tete, titi, tata]
true
[tata, toto, tete, titi]

 Parcours de la file : 
tata
toto
tete
titi

 Parcours de la file dans le sens inverse : 
titi
tete
toto
tata

C'est très simple d'utilisation, n'est-ce pas ?
Ceci étant fait, je vous propose de voir la deuxième interface de cette catégorie d'objets.

L'interface BloquingDeque<E>

Cette interface ajoute la notion de blocage lors des interactions avec la collection, ce qui la rend thread-safe. Il existe des méthodes qui interagissent avec la collection mais qui ont des comportements différents en retour. Nous avons vu précédemment que certaines méthodes lèvent une exception alors que d'autres renvoient seulement une valeur null. Avec cette interface, certaines méthodes vont bloquer le thread appelant tant que l'objet collection n'est pas disponible et ceci indéfiniment ou selon une durée paramétrée (Time out).

Voici un tableau récapitulatif des méthodes usuelles et de leurs comportements. Le premier tableau correspond aux méthodes utilisées sur les premiers éléments d'une collection alors que le second se base sur les méthodes travaillant avec les derniers éléments :

Méthodes sur les premiers éléments

Type de méthodes

Lève une exception

Retourne une valeur

Bloque le thread indéfiniment

Bloque le thread pendant une durée

Méthode d'insertion

addFirst(E elem)

offerFirst(E elem)

putFirst(E elem)

offerFirst(E elem, long timeout, TimeUnit unit)

Méthode de récupération avec suppression

removeFirst()

pollFirst()

takeFirst()

pollFirst(long timeout, TimeUnit unit)

Méthode de récupération sans suppression

getFirst()

peekFirst()

Méthodes sur les derniers éléments

Type de méthodes

Lève une exception

Retourne une valeur

Bloque le thread indéfiniment

Bloque le thread pendant une durée

Méthode d'insertion

addLast(E elem)

offerLast(E elem)

putLast(E elem)

offerLast(E elem, long timeout, TimeUnit unit)

Méthode de récupération avec suppression

removeLast()

pollLast()

takeLast()

pollLast(long timeout, TimeUnit unit)

Méthode de récupération sans suppression

getLast(E elem)

peekLast(E elem)

L'objet LinkedBlockingDeque<E>

Cette implémentation se base sur une suite de nœuds liés entre eux. Elle gère une autorisation de capacité via un constructeur spécifique mais, en l'absence de ce paramètre, la capacité maximum autorisée correspond à la valeur définit dans l'objet Integer par la constante Integer.MAX_VALUE et, tout comme l'objet ci-dessus, les différentes opérations faites sur cet objet sont en temps constant.

Voici un code qui met en pratique cet objet :

import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


public class Main4 {

   public static void main(String[] args) {

      //Création d'une collection n'acceptant que 10 entrées
      LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<Integer>(10); 
      //Nous allons tester la concurrence sur cette collection
      MonThread t1 = new MonThread(null, 0, lbd);
      MonThread t2 = new MonThread(TimeUnit.MILLISECONDS, 10L, lbd);
      
      //Nous lançons les threads 
      //t1 ne nous rendra pas la main car nous tentons d'ajouter des éléments
      //Alors que la collection sera considérée comme pleine
      t1.start();
      //alors que ce Thread terminera normalement car nous ne bloquons le thread
      //que pendant un certain temps
      t2.start();
      
      //On attend quelques secondes et on lance une suppression des éléments
      try {
         System.out.println("Pause de 10 secondes");
         Thread.sleep(10000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      } 
      
      System.out.println("Suppression de quelques éléments de la collection");
      for(int i = 0; i < 10; i++ )
         lbd.pollLast();
      //après cette série de suppressions, le premier thread arrive à terminer son travail
      
      //Nous attendons quelques secondes
      try {
         System.out.println("Pause de 3 secondes");
         Thread.sleep(3000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      
      LinkedBlockingDeque<Integer> lbd2 = new LinkedBlockingDeque<Integer>(3);
      //Nous tentons maintenant de mettre le contenu  notre première collection
      //dans la seconde, mais ceci lèvera une exception car le contenu de la première est trop volumineux
      //pour la seconde ! 
      System.out.println("Capacité restante : " + lbd.remainingCapacity());
      System.out.println("Capacité restante : " + lbd2.remainingCapacity());
      lbd.drainTo(lbd2);
       
      System.out.println(lbd2);
   }
}

class MonThread extends Thread{
   private TimeUnit tu;
   private long timeout;
   private LinkedBlockingDeque<Integer> lbd;
   
   public MonThread(TimeUnit tu, long timeout, LinkedBlockingDeque<Integer> lbd){
      this.tu = tu;
      this.timeout = timeout;
      this.lbd = lbd;
   }
   
   public void run(){
      Random rand = new Random();
      for(int i =0; i < 10 ; i++){
         
         Integer integer = rand.nextInt(20);
         
         //Si pas de critère de durée
         //On utilise la méthode qui bloque le thread indéfiniment
         if(this.tu == null){
            try {
               if((i%2) == 0){
                  this.lbd.putFirst(integer);
                 
               }
               else
                  this.lbd.putLast(integer);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         //Sinon, on ne bloque que quelques secondes
         else{            
            try {
               if((i%2) == 0){
                  this.lbd.offerFirst(integer, this.timeout, this.tu);
               }
               else{
                  this.lbd.offerLast(integer, this.timeout, this.tu);
               }
            } catch (InterruptedException e) {
               e.printStackTrace();
            }         
         }
         try {
            this.sleep(100);
         } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
         }
      }
      
      //On affiche le résultat lors de la fin du traitement
      System.out.println("À la fin du thread " + this.getName() + " : " + lbd);      
   }   
}

Et le résultat :

Pause de 10 secondes
À la fin du thread Thread-1 : [10, 1, 5, 14, 11, 9, 4, 11, 2, 17]
Suppression de quelques éléments de la collection
Pause de 3 secondes
À la fin du thread Thread-0 : [0, 1, 7, 1, 8]
Capacité restante : 5
Capacité restante : 3
Exception in thread "main" java.lang.IllegalStateException: Deque full
        at java.util.concurrent.LinkedBlockingDeque.addLast(Unknown Source)
        at java.util.concurrent.LinkedBlockingDeque.add(Unknown Source)
        at java.util.concurrent.LinkedBlockingDeque.drainTo(Unknown Source)
        at Main4.main(Main4.java:51)

Vous pouvez voir que le premier thread attend indéfiniment que la collection ait de nouveau de l'espace pour pouvoir y ajouter des éléments alors que le second thread se contente de jeter l'éponge et continue sans se poser de questions.

Vous avez pu voir que cette interface est tout de même pratique et n'est pas très difficile à utiliser.
Vous commencez maintenant à avoir une bonne vision de ce que permet le framework Collections mais il vous manque encore une corde à votre arc : l'interface Map<E> , et c'est ce que nous allons voir tout de suite ! :pirate:

Exemple de certificat de réussite
Exemple de certificat de réussite