You now know a lot about Semaphores! These are great for ensuring fair use of a resource, but they don't help with one main problem: coordinating the processing of your app so that you know when your threads have finished collaborating on the task you set them.
As you've seen, Semaphores are great when you want to solve a problem where you need to limit the threads sharing a resource at the same time. They do this by limiting the number of threads running a critical section at the same time. The actual number of threads solving a problem may be much higher than this limit, as a semaphore released by one thread can always be acquired by another.
What if you had three files of planetary data and wanted to wait for three to process intermediate results, before calculating their shared average? What you need is a mechanism which allows a fixed number of threads to work on a critical section, and then tells a parent thread when it's safe to resume processing their output. Java's concurrency framework allows you to do this using the CountDownLatch. A CountDownLatch provides the countDown()
method, so that each thread can call this when it's completed its work. A thread waiting for those threads can call await()
to block on them.
Using a Countdown Latch
How do you use a CountDownLatch?
There are three steps involved in using a CountDownLatch:
CREATE: A parent thread creates a CountDownLatch to share with child processes. It passes the number of processes that you wait upon to the CountDownLatch constructor. For instance, you might pass it a value of 4, if you had four files to process.
COUNTDOWN: Each thread doing the actual work, calls
latch.countDown()
to signal that it has completed its work.AWAIT: The parent thread waits for the CountDownLatch to reach zero, by calling
latch.await()
. This puts the thread in a WAIT state until the workers have completed.
When should I use a CountDownLatch?
CountDownLatches are suited to situations where you have to wait for multiple threads to complete before proceeding further. Consider what happens when you board an airplane. You go to a boarding gate where you'll find a group of passengers waiting to be allowed to board a plane together.
If you were to model this in code, you might write:
CountDownLatch boardingGroup = new CountDownLatch(CAPACITY_OF_AEROPLANE);
// passangers disembark
(new Thread(()->boardingGroup.countDown())).start();
(new Thread(()->boardingGroup.countDown())).start();
(new Thread(()->boardingGroup.countDown())).start();
// wait for each passangers to disembark
boardingGroup.await();
That is, the group only has as many passengers as there are seats on the plane. Once the plane is boarded and in the sky, no more passengers can board or alight from the aircraft. However, as they disembark, each calls boardingGroup.countDown();
.
We say that CountDownLatch is a synchronizer, which allows your code to wait for one or more threads to complete the execution of a critical section.
Try it Out For Yourself!
Spin up JShell and try out the above. Let's walk through it together in this screencast first:
Step 1: Create a CountDownLatch for a tiny plane with three passengers.
CountDownLatch boardingGroup = new CountDownLatch(3)
Step 2: Check the count on the latch.
boardingGroup.getCount()
This should be set to three.
Step 3: Test that await() blocks your thread.
boardingGroup.await()
If you call boardingGroup.await()
, it should block. You can unblock it by hitting CTRL-C.
Step 4: Now create a thread that calls countDown() on the latch.
(new Thread(()->boardingGroup.countDown())).start()
Step 5: Call boardingGroup.getCount()
again. Repeat Step 3, until getCount()
returns 0.
Step 6: Call boardingGroup.await()
. It should let you through and produce a new jshell prompt. Can you see how this differs from the blocking behavior you saw in Step 3?
Measuring How Using CountDownLatches Impacts Our Planet File Analyzer: Practice!
I've modified the PlanetFileAnalyzer to use a CountDownLatch in order to start calculating the average once all 23 Kepler files have been analyzed. Let's run benchmarks and see how it performs.
Step 1: Check out the branch p2-c3-countdownlatches:
git checkout p2-c3-countdownlatches
Step 2: Now follow along as I show you the implementation.
As you saw, you start with latches set to the NUMBER_OF_FILES, which is 23:
// Create Latches
static CountDownLatch summingLatch = new CountDownLatch( NUMBER_OF_FILES );
static CountDownLatch countingLatch = new CountDownLatch( NUMBER_OF_FILES );
Each future then calls .countDown()
in a finally block. Wait on these latches before summing their returned values.
Step 3: Now run the benchmark.
Run the Gradle runBenchmarks task:
./gradlew runBenchmarks
When running this, I get back the following results:
Benchmark Mode Cnt Score Error Units
BenchmarkRunner.benchmarkAtomicsWithFutures thrpt 10 50.887 ± 4.187 ops/s
BenchmarkRunner.benchmarkFuturesWithCountdownLatches thrpt 10 34.393 ± 3.209 ops/s
BenchmarkRunner.benchmarkFuturesWithEightSemaphores thrpt 10 26.406 ± 1.870 ops/s
BenchmarkRunner.benchmarkFuturesWithExecutorService thrpt 10 34.237 ± 2.441 ops/s
BenchmarkRunner.benchmarkFuturesWithReentrantLocks thrpt 10 35.652 ± 1.384 ops/s
BenchmarkRunner.benchmarkMultiProcess thrpt 10 1.426 ± 0.080 ops/s
BenchmarkRunner.benchmarkParallelStream thrpt 10 59.384 ± 3.412 ops/s
BenchmarkRunner.benchmarkRawThreadsWithFutureTasks thrpt 10 34.452 ± 3.151 ops/s
BenchmarkRunner.benchmarkSingleProcess thrpt 10 18.094 ± 0.967 ops/s
As you can see, benchmarkFutureWithCountdownLatches comes in slightly faster than the semaphore implementation. Rather than acquiring and releasing a semaphore, countdown the latch atomically for each file. This way, the calling thread knows when it's safe to start calculating an average.
CountDownLatches allow you to clearly and safely wait for some work to be done. They aren't there primarily to speed up your code. If you wait on a latch, however, it causes a thread to block until it reaches 0. Any blocking slows your code down, so try only to use latches where necessary.
How did the benchmark look for you? The results on your PC might differ. Can you see any other ways of speeding this up? More importantly, does it need to be sped up? All of these are questions for your business and what it deems acceptable performance requirements.
Let's Recap!
java.util.concurrent.CountDownLatch
lets you create an object that allows a thread to share work with other threads and wait for them to complete.A CountDownLatch has a counter associated with it, which you can set when you call its constructor. Set this to the number of threads you want to wait on.
Each thread you want to wait on is given the same instance of CountDownLatch and is expected to call
countDown()
on it when it has finished processing. Calling this decrements the counter of the latch by one.You can then have another thread call
await()
on the latch and block until the counter reaches 0. This should signify that you're ready to start working on the next part of the solution, unblocking the waiting thread.
In the next chapter, we'll work with CompletableFutures to connect concurrent actions together!