- 12 hours
- Hard
Free online content available in this course.
course.header.alt.is_video
course.header.alt.is_certifying
Got it!Last updated on 11/8/22
Develop Thread-Safe Asynchronous Solutions
Evaluated skills
- Develop thread safe asynchronous solutions
Description
This quiz will evaluate your understanding of how to build asynchronous thread-safe applications composed of producers and consumers to pass between stages. You'll get to prove your understanding of Java's BlockingQueues and how Reactive Streams are used in Java. We'll also make use of ConcurrentHashMaps and CopyOnWriteArrayLists to manipulate a fake dataset of galaxies.
You are brought into a project where there is a large dataset of about 100,000 galaxies. You're asked to help the team in getting through some software issues which they are experiencing while analyzing data relating to galaxies.
Let's start with a galaxy entity that is composed of the galaxies name, and its the type of galaxy it is, according to a GalaxyType enum. Let's look at our galaxy and GalaxyType classes. Feel free to paste them into JShell if you want to try out your answers.
public class Galaxy {
public enum GalaxyType {
ELLIPTICAL,
SPIRAL,
IRREGULAR
};
// For prototyping. Don't break encapsulation in real code.
public String name;
public GalaxyType galaxyType;
public Galaxy(String name, GalaxyType galaxyType) {
this.name = name;
this.galaxyType = galaxyType;
}
}
Since the team's focus is on proving their algorithms, they have given you class that builds a sample set of fake galaxies to use in validating your code:
public class FakeGalaxyListFactory {
// make a list of size fake galaxies
public static List<Galaxy> make(Integer size) {
List<Galaxy> galaxies = IntStream.range(0, size).
// Create galaxies with random types
mapToObj( n -> new Galaxy("Galaxy-" + n,
Galaxy.GalaxyType.values()[(int) Math.round(Math.random() *10 %2)])
).
// Create a list with a mutable reduction
collect(Collectors.toList());
return galaxies;
}
}
Let's dive in!
Question 1
You are asked to investigate a bug in an application that uses the following class. This is used by two threads, which record the time at which the code last saw specific galaxy types.
import java.time.Instant; List<Galaxy> galaxies = FakeGalaxyListFactory.make(10000); ConcurrentHashMap<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new ConcurrentHashMap<>(); CountDownLatch latch = new CountDownLatch(galaxies.size()); Thread galaxyScanner = new Thread( () -> { galaxies.forEach( galaxy -> { galaxyTypesSeen.put( galaxy.galaxyType, Instant.now()); latch.countDown(); } ); }); ; Thread galaxyHistoryReporter = new Thread( () -> { try { // Wait for the latch to reach 0 while (latch.getCount() != 0) { Iterator<Galaxy.GalaxyType> iterator = galaxyTypesSeen.keySet().iterator(); // Loop over the keyset while (iterator.hasNext()) { Galaxy.GalaxyType galaxyType = iterator.next(); Instant lastSeen = galaxyTypesSeen.get(galaxyType); String report = String.format("galaxyType:%s", lastSeen); System.out.println(report); } // Pause for .1 seconds or until our latch reaches 0 latch.await(1, TimeUnit.NANOSECONDS); } } catch (Exception e) { e.printStackTrace(); } });
When these threads are started, you see ConcurrentModificationExceptions. You can reproduce this by pasting the code into into JShell, and starting it as follows.
jshell> galaxyHistoryReporter.start() jshell> galaxyScanner.start(); java.util.ConcurrentModificationException at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1490) at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1513) at REPL.$JShell$39.lambda$do_it$$0($JShell$39.java:16) at java.base/java.lang.Thread.run(Thread.java:835)
Which of the following will fix this?
Replace the HashMap at Line 3 with a
Hashtable<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new Hashtable<>();
Replace the HashMap at Line 3 with a
Map<Galaxy.GalaxyType, Instant> galaxyTypesSeen = Collections.synchronizedMap( new HashMap() );
Replace the HashMap at Line 3 with a
ConcurrentHashMap<Galaxy.GalaxyType, Instant> galaxyTypesSeen = new ConcurrentHashMap( new HashMap() );
Replace Line 24 with a
ConcurrentIterator<Galaxy.GalaxyType> iterator =
galaxyTypesSeen.keySet().concurrentIterator();
Question 2
Look at this code which will add the galaxy names to a new ArrayList. While one thread does this, another will keep reporting on the most recent galaxy seen in that list.
import java.time.Instant; List<Galaxy> galaxies = FakeGalaxyListFactory.make(10); CountDownLatch latch = new CountDownLatch(galaxies.size()); List<String> listOfNames = new ArrayList<>(); Executors.newSingleThreadExecutor().submit( ()->{ galaxies.forEach( (galaxy) -> { try { // Simulate a possible delay of 1 second Thread.sleep(1000); listOfNames.add(galaxy.name); latch.countDown(); } catch (Exception e) { e.printStackTrace(); // Todo: handle exception } finally { latch.countDown(); } } ); } ); Executors.newSingleThreadExecutor().submit( ()->{ while( latch.getCount() != 0 ) { try { Iterator<String> iterator = listOfNames.iterator(); String galaxy = null; System.out.println(Instant.now()); while (iterator.hasNext()) { // Simulate delay of 1/10th second Thread.sleep(100); // skip ahead galaxy = iterator.next(); } System.out.println("The most recent galaxy seen at this time is " + galaxy); latch.await(100, TimeUnit.NANOSECONDS); } catch (Exception e) { e.printStackTrace(); } } } );
When run, this produces output similar to:
java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996) at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835) The most recent galaxy seen at this time is Galaxy-6 java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996) at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835) The most recent galaxy seen at this time is Galaxy-7 java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996) at REPL.$JShell$120.lambda$do_it$$0($JShell$120.java:20) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835)
You can try this out for yourself in JShell.
Which of the following fixes the above issue?
Change the ArrayList listOfNames to a ConcurrentHashMap.
Remove the Thread.sleep() calls.
Replace ArrayList<String> with CopyOnWriteArrayList<String>.
Remove the countDownLatch.
Question 3
You've been asked by a colleague to help in understanding some code. You're shown a producer class that publishes galaxies to a queue. Feel free to paste it into JShell.
public class GalaxyPublisher { BlockingQueue<Galaxy> galaxyQueue; public GalaxyPublisher (BlockingQueue<Galaxy> galaxyQueue) { this.galaxyQueue = galaxyQueue; } public void publish(List<Galaxy> galaxies) { galaxies.forEach(galaxy -> { try { galaxyQueue.put(galaxy); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
What will the following code do when the GalaxyTypeQueue feeding consumer has a depth of 3 or more?
public class Consumer implements Runnable { BlockingQueue<Galaxy> galaxyQueue; CountDownLatch killSwitch; public Consumer(BlockingQueue<Galaxy> galaxyQueue, CountDownLatch killSwitch) { this.galaxyQueue = galaxyQueue; this.killSwitch = killSwitch; } public void run() { while (true && killSwitch.getCount() != 0) { try { Galaxy galaxy = galaxyQueue.take(); System.out.println("Consumed:" + galaxy.name); } catch (Exception e) { e.printStackTrace(); } } } }
Feel free to try it out in JShell.
Which of the following will result in the consumer receiving 2000 galaxy objects?
BlockingQueue galaxyQueue = new BlockingQueue(); // kill switch to stop the consumer CountDownLatch killSwitch = new CountDownLatch(1); Executors.newSingleThreadPool().submit(new Consumer(galaxyQueue, killSwitch));
BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(2000); CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop Thread consumer = new Thread(new Consumer(galaxyQueue, killSwitch)); consumer.start(); GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue); CompletableFuture.runAsync( ()->publisher.publish( FakeGalaxyListFactory.make(1000) ) ). thenRunAsync(()->killSwitch.countDown());
BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(20); CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop Thread consumer = new Thread(new Consumer(galaxyQueue, killSwitch)); consumer.start(); GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue); CompletableFuture.runAsync( ()->publisher.publish( FakeGalaxyListFactory.make(2000) ) ). thenRunAsync(()->killSwitch.countDown());
BlockingQueue<Galaxy> galaxyQueue = new LinkedBlockingQueue<>(10); CountDownLatch killSwitch = new CountDownLatch(1); // Used to tell the consumer to stop Consumer consumer = new Consumer(galaxyQueue, killSwitch); GalaxyPublisher publisher = new GalaxyPublisher(galaxyQueue); CompletableFuture consumerFuture = CompletableFuture.runAsync(consumer); CompletableFuture producerFuture = CompletableFuture.runAsync( ()->publisher.publish( FakeGalaxyListFactory.make(2000) ) ); CompletableFuture.allOf(consumerFuture, producerFuture).get();
- Up to 100% of your training program funded
- Flexible start date
- Career-focused projects
- Individual mentoring