• 12 hours
  • Hard

Free online content available in this course.

course.header.alt.is_video

course.header.alt.is_certifying

Got it!

Last updated on 11/8/22

Develop Thread-Safe Asynchronous Solutions

Evaluated skills

  • Develop thread safe asynchronous solutions

Description

This quiz will evaluate your understanding of how to build asynchronous thread-safe applications composed of producers and consumers to pass between stages. You'll get to prove your understanding of Java's BlockingQueues and how Reactive Streams are used in Java. We'll also make use of ConcurrentHashMaps and CopyOnWriteArrayLists to manipulate a fake dataset of galaxies.

You are brought into a project where there is a large dataset of about 100,000 galaxies. You're asked to help the team in getting through some software issues which they are experiencing while analyzing data relating to galaxies.

Let's start with a galaxy entity that is composed of the galaxies name, and its the type of galaxy it is, according to a GalaxyType enum. Let's look at our galaxy and GalaxyType classes. Feel free to paste them into JShell if you want to try out your answers.

public class Galaxy {
    public enum GalaxyType {
        ELLIPTICAL,
        SPIRAL,
        IRREGULAR
    };
    
    // For prototyping. Don't break encapsulation in real code.
    public String name;
    public GalaxyType galaxyType;

    public Galaxy(String name, GalaxyType galaxyType) {
        this.name = name;
        this.galaxyType = galaxyType;
    }
}

Since the team's focus is on proving their algorithms, they have given you class that builds a sample set of fake galaxies to use in validating your code:

public class FakeGalaxyListFactory {
    // make a list of size fake galaxies
    public static List<Galaxy> make(Integer size) {
        List<Galaxy> galaxies = IntStream.range(0, size).
            // Create galaxies with random types
            mapToObj( n -> new Galaxy("Galaxy-" + n,
                Galaxy.GalaxyType.values()[(int) Math.round(Math.random() *10 %2)])
            ).
            // Create a list with a mutable reduction
            collect(Collectors.toList());
        return galaxies;
    }
}

Let's dive in!

  • Question 1

    You are asked to investigate a bug in an application that uses the following class. This is used by two threads, which record the time at which the code last saw specific galaxy types.

    import java.time.Instant;
    List<Galaxy> galaxies = FakeGalaxyListFactory.make(10000);
    ConcurrentHashMap<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new ConcurrentHashMap<>();
    CountDownLatch latch = new CountDownLatch(galaxies.size());
    
    Thread galaxyScanner = new Thread(
            () -> {
                galaxies.forEach(
                    galaxy -> {
                        galaxyTypesSeen.put(
                        galaxy.galaxyType, Instant.now());
                        latch.countDown();
                    }
                );
            });
    
    ;
    
    Thread galaxyHistoryReporter = new Thread(
            () -> {
                try {
                    // Wait for the latch to reach 0
                    while (latch.getCount() != 0) {
                        Iterator<Galaxy.GalaxyType> iterator =
                                galaxyTypesSeen.keySet().iterator();
                        // Loop over the keyset
                        while (iterator.hasNext()) {
                            Galaxy.GalaxyType galaxyType = iterator.next();
                            Instant lastSeen = galaxyTypesSeen.get(galaxyType);
                            String report = String.format("galaxyType:%s", lastSeen);
                            System.out.println(report);
                        }
                    
                        // Pause for .1 seconds or until our latch reaches 0
                        latch.await(1, TimeUnit.NANOSECONDS);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
    

    When these threads are started, you see ConcurrentModificationExceptions. You can reproduce this by pasting the code into into JShell, and starting it as follows.

    jshell> galaxyHistoryReporter.start()
    jshell> galaxyScanner.start();
    java.util.ConcurrentModificationException
        at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1490)
        at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1513)
        at REPL.$JShell$39.lambda$do_it$$0($JShell$39.java:16)
        at java.base/java.lang.Thread.run(Thread.java:835)
    
    

     

    Which of the following will fix this? 

    • Replace the HashMap at Line 3 with a  Hashtable<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new Hashtable<>();

    • Replace the HashMap at Line 3 with a  

      Map<Galaxy.GalaxyType, Instant> galaxyTypesSeen = Collections.synchronizedMap( new HashMap() );

    • Replace the HashMap at Line 3 with a  

      ConcurrentHashMap<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new ConcurrentHashMap( new HashMap() );

    • Replace Line 24 with a

       ConcurrentIterator<Galaxy.GalaxyType> iterator =galaxyTypesSeen.keySet().concurrentIterator();

  • Question 2

    Look at this code which will add the galaxy names to a new ArrayList. While one thread does this, another will keep reporting on the most recent galaxy seen in that list.  

    import java.time.Instant;
    List<Galaxy> galaxies = FakeGalaxyListFactory.make(10);
    CountDownLatch latch = new CountDownLatch(galaxies.size());
    List<String> listOfNames = new ArrayList<>();
    
    Executors.newSingleThreadExecutor().submit(
        ()->{
            galaxies.forEach(
                (galaxy) -> {
                    try {
                        // Simulate a possible delay of 1 second
                        Thread.sleep(1000);
                        listOfNames.add(galaxy.name);
                        latch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace(); // Todo: handle exception
                    } finally {
                        latch.countDown();
                    }
                }
            );
        }
    );
    
    Executors.newSingleThreadExecutor().submit(
        ()->{
            while( latch.getCount() != 0 ) {
                try {
                    Iterator<String> iterator = listOfNames.iterator();
                    String galaxy = null;
                    System.out.println(Instant.now());
                    while (iterator.hasNext()) {
                        // Simulate delay of 1/10th second
                        Thread.sleep(100);
                        // skip ahead
                        galaxy = iterator.next();
                    }
                    System.out.println("The most recent galaxy seen at this time is " + galaxy);
                    latch.await(100, TimeUnit.NANOSECONDS);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    );
    

     

    When run, this produces output similar to:

    java.util.ConcurrentModificationException
        at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
        at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
        at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:835)
    The most recent galaxy seen at this time is Galaxy-6
    java.util.ConcurrentModificationException
        at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
        at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
        at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:835)
    The most recent galaxy seen at this time is Galaxy-7
    java.util.ConcurrentModificationException
        at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
        at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
        at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:835)
    
    

    You can try this out for yourself in JShell.

    Which of the following fixes the above issue?

    • Change the ArrayList listOfNames to a ConcurrentHashMap.

    • Remove the Thread.sleep() calls.

    • Replace ArrayList<String> with  CopyOnWriteArrayList<String>.

    • Remove the countDownLatch.

  • Question 3

    You've been asked by a colleague to help in understanding some code. You're shown a producer class that publishes galaxies to a queue. Feel free to paste it into JShell.

    public class GalaxyPublisher {
        BlockingQueue<Galaxy> galaxyQueue;
        
        public GalaxyPublisher (BlockingQueue<Galaxy> galaxyQueue) {
            this.galaxyQueue = galaxyQueue;
        }
        
        public void publish(List<Galaxy> galaxies) {
            galaxies.forEach(galaxy -> {
                try {
                    galaxyQueue.put(galaxy);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    

    What will the following code do when the GalaxyTypeQueue feeding consumer has a depth of 3 or more?

    public class Consumer implements Runnable {
        BlockingQueue<Galaxy> galaxyQueue;
        CountDownLatch killSwitch;
        
        public Consumer(BlockingQueue<Galaxy> galaxyQueue, CountDownLatch killSwitch) {
            this.galaxyQueue = galaxyQueue;
            this.killSwitch = killSwitch;
        }
        
        public void run() {
            while (true && killSwitch.getCount() != 0) {
                try {
                    Galaxy galaxy = galaxyQueue.take();
                    System.out.println("Consumed:" + galaxy.name);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    Feel free to try it out in JShell.

     

    Which of the following will result in the consumer receiving 2000 galaxy objects?

     

    • BlockingQueue galaxyQueue = new BlockingQueue();
      
      // kill switch to stop the consumer
      CountDownLatch killSwitch = new CountDownLatch(1);
      Executors.newSingleThreadPool().submit(new Consumer(galaxyQueue, killSwitch));
      
      

       

    • BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(2000);
      CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop
      
      Thread consumer = new Thread(new Consumer(galaxyQueue, killSwitch));
      consumer.start();
      
      GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue);
      
      CompletableFuture.runAsync(
          ()->publisher.publish( FakeGalaxyListFactory.make(1000) ) ).
          thenRunAsync(()->killSwitch.countDown());
      
      

       

    • BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(20);
      CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop
      
      Thread consumer = new Thread(new Consumer(galaxyQueue, killSwitch));
      consumer.start();
      
      GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue);
      
      CompletableFuture.runAsync(
          ()->publisher.publish( FakeGalaxyListFactory.make(2000) ) ).
          thenRunAsync(()->killSwitch.countDown());
      
    • BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(10);
      CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop
      
      Consumer consumer = new Consumer(galaxyQueue, killSwitch);
      GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue);
      
      CompletableFuture consumerFuture = CompletableFuture.runAsync(consumer);
      CompletableFuture producerFuture = CompletableFuture.runAsync(
          ()->publisher.publish( FakeGalaxyListFactory.make(2000) ) );
      CompletableFuture.allOf(consumerFuture, producerFuture).get();
      
      
      
Ever considered an OpenClassrooms diploma?
  • Up to 100% of your training program funded
  • Flexible start date
  • Career-focused projects
  • Individual mentoring
Find the training program and funding option that suits you best