• 12 hours
  • Hard

Free online content available in this course.

course.header.alt.is_video

course.header.alt.is_certifying

Got it!

Last updated on 11/8/22

Connect Concurrent Actions Using CompletableFutures

Doing Something With the Result of a Future

When we used a CountDownLatch as a barrier, the problem we were trying to solve was that of waiting for some concurrent work to complete so that we could process its result - like waiting for all boarding passes to be checked before a plane might depart, fly through the air, and land. That is, it was just a step towards a final solution.

Waiting for threads to produce one output before taking the next step, is a typical concurrent application pattern called the producer-consumer pattern. It essentially means that one part of your code is responsible for producing a result, which a later stage consumes or uses in its own computation. A consumer receives the output of the producer and does something with that output. It might even produce its own output for another consumer! 

We often break problems into smaller ones, later combining results into an overall solution. For instance, our planet temperature averaging implementations processed multiple files concurrently and then brought the results together into a single average at the end. You're going to learn about a particular future that simplifies connecting your producer and consumer tasks. It's called CompletableFuture. The CompletableFuture can represent many steps in which you process intermediary results from multiple tasks, and execute each step until you have a final result. Like standard futures, you can call  get()  on a CompletableFuture to fetch the result of all your steps.

UML Diagram showing some of the methods in a completion stage, and how it is implemented by a CompletableFuture.
CompletableFuture implements CompletionStage while extending a regular future

As you can see in the Java class diagram above, CompletableFuture is a class that extends a regular future and also implements a new interface called the CompletionStageThis interface connects producers and consumers in a series of sequential steps and removes the need to do a separate  get()  followed by some custom logic for each step. You can pipeline different steps together using just the API of a CompletionStage to define which tasks get chained together to solve your problem concurrently. 

CompletableFuture is designed to save you from writing all of that custom code, which takes the output from one set of threads and hands it to another.

Methods in a CompletionStage implementation are defined so that they can run one or more concurrent tasks in sequence. This allows you to chain them together futures in a similar way to how you chain methods together when processing a stream.

The output from one CompletionStage goes to another, providing you with the power to declare how you want your code to run. For instance, you can use the thenRun(myLamda) method to pass another task which is run after a previous step.

Try Out a Simple CompletableFuture in JShell

Step 1: Fire up JShell:

jshell

Step 2: Create a CompletableFuture of type string:

CompletableFuture<String> future = new CompletableFuture<>();

Step 3: Declare that when it completes, you pass its output to another lambda.

future.thenAccept((person) -> System.out.println("Hi " + person))

This causes the output of your future to be concatenated with "Hi " and printed to your terminal.

Step 4: Force the CompletableFuture to complete with a value of Captain Crunchy.

future.complete("Captain Crunchy")

This forces the future to complete with a value of "Captain Crunchy." Kicking off the lambda defined in Step 3.

Now that you've done it, you can follow along as I repeat those steps:

Did you see how we were able to take the output of one task and pass it to the next?

Isn't this what we do with Futures?

As you've seen, when using a Future, you can call the  get()  method to provide  a result when the Future has completed executing its work. From there, however, it's up to you to write the code that provides this output to whatever code was going to consume it. Take the following code to solve Pythagoras' famous theorem of a²+b²=c².

// Produce the values for a^2 + b^2
Future<Integer> aSquaredFuture = executorService.submit(()->a*a);
Future<Integer> bSquaredFuture = executorService.submit(()->b*b);

// Wait for the values to pass on to our Consumer
Integer aSquared = aSquaredFuture.get();
Integer bSquared = bSquaredFuture.get();

// Consume and calculate Pythagoras' a^2+b^2=c^2
Double c = Math.sqrt(aSquared + bSquared);
  • Lines 1-3: First farm out the calculation of a² and b².

  • Lines 2-3: Wait for the Futures so you can pass them onto the next step. This is manually synchronizing code and waiting for the results to be ready. You're telling the thread to block and wait for a result, by calling get().

  • Line 10: Consume a² and b² now that they are passed. Use it to calculate c².

How could we improve on this?

Wouldn't it be great if you didn't have to bother with writing Lines 6 and 7, which wait on the Future? It's boilerplate - stuff you have to do due to the interfaces and libraries.

You may want to declare that the output of those Futures goes straight into calculating c², ideally, without writing too much extra code. CompletableFutures lets you do this. Let's see how:

As you can see, we were able to pipeline the steps involved in solving this problem without having to be responsible for the intermediary gets. Let's break this down; see if you can follow along in JShell:

Step 1: Assign values to a and b:

Integer a = 2;
Integer b = 2;

Step 2: Declare a variable into which you assign c's future:

CompletableFuture<Double> cFuture = 

You should see a ...> indicating that JShell is waiting for more input.

Step 3: Start constructing the pipeline of CompletationStages with a CompletableFuture to calculate a²:

CompletableFuture.supplyAsync(()->a*a).

Again, you should see a ...> indicating that JShell is waiting for more input. CompletableFuture.supplyAsync() allows you to create a CompletableFuture which has its output supplied to another CompletionStage.

Step 4: Take the output of a² and combine it with the result of calculating b². Do this by calling the CompletationStage method thenCombine():

thenCombine(

 You should see a ...> indicating that JShell is waiting for more input.

Step 5:  thenCombine()  takes two arguments. The first is another CompletableFuture to produce an output that can be combined with the previous Future. This will be used to calculate b²:

CompletableFuture.supplyAsync( () -> b*b ),

This runs a lambda to calculate b² asynchronously. When ready, it's supplied as the second parameter to the other argument of  thenCombine().

Step 6:  thenCombine()  takes a BiFunction as its second argument. You can think of this as a lambda that takes two arguments.

(aSquared, bSquared) -> Math.sqrt(aSquared + bSquared))

The first argument is the output from the first CompletableFuture, calculating a². The second is the output from the one that calculated b². Make sure you add the extra closing parenthesis ) to close the one opened at Step 4.

A BiFunction defines an interface for a class with only one method, which accepts two arguments. Note that this is intentionally similar to a function interface. In this case, it finds the square root of a²+b².

Step 7: Call  cFuture.get()  to get back your result. It's still a Future, so get blocks it until a result is ready.

What happens if one of my CompletionStages fails and throws an exception?

As you've already seen, you can catch an exception thrown by a Future by checking for an ExecutionException. Doing this for a CompletableFuture is far simpler and done simply by declaring another CompletionStage.

To catch an exception thrown by a CompletableFuture, chain on a call to  exceptionally()  and pass in a lambda to handle a throwable of type CompletationException.

Let's do this together in JShell:

As you saw, we chained a   .exceptionally()  to our  .supplyAsync()  call.

Measuring the Use of CompletableFutures in our Planet Analyzer: Practice!

Let me show you how to use CompletableFutures to simplify our planet analyzer:

Let's break this down.

Step 1: Set up two completable Futures. One to calculate the sum of temperatures, and another to calculate the sample size. These return 0.0 when they are ready to kick off calculations with some initial values.

        CompletableFuture<Double> summingCompletableFuture = CompletableFuture.supplyAsync(() -> 0.0);
        CompletableFuture<Double> countingCompletableFuture = CompletableFuture.supplyAsync(() -> 0.0);

Step 2: Then loop through each filePath and update the completable Futures, so that they take the value of the last CompletableFuture in the chain and add a new value to it.

for (Path filePath : files) {
    summingCompletableFuture =
        summingCompletableFuture.thenCombineAsync(
            CompletableFuture.supplyAsync(() -> sumFile(filePath)),
            (left, right) -> left + right
    );
...
}

As you can see on Lines 3 to 5, you take the value of the lambda  CompletableFuture.supplyAsync(() -> sumFile(filePath))  and add it to the result of the previous Future.

For instance, in the first loop, the value of left will be 0.0 (from Step 1), and right will be the first temperature in the first file. Do the same for the countingCompletableFuture, but calling the countFile()  method instead.

Step 3: Finally, use summingCompletableFuture.get() and countingCompletableFuture.get() to calculate the average temperature.

return summingCompletableFuture.get() / countingCompletableFuture.get();

Did you notice how much less you have to do here?

Running the Benchmark!

Checkout the branch:

git checkout p2-c4-completable-future

Run the runBenchmarks Gradle task:

./gradlew runBenchmarks

How did your local performance compare with the screencast?

Let's Recap!

  • CompletableFutures are a special implementation of a future that allow you to connect the output of Futures task to the input of other Futures.

    • CompletatbleFutures can connect tasks using methods from the CompletionStage interface. This allows you to create a chain from one CompletionStage to another. You can even specify if you want a stage to pass its output to the next stage.

    • The methods of a CompletionStage allow you to connect stages, but also declare a task that handles CompletionExceptions. These exceptions are thrown when an unchecked exception occurs in a CompetionStage. It connects to your chain of stages, using the  .exceptionally(CompletionException e)  method.

    • As CompletableFutures are still Futures, they also provide a blocking get() method for a thread to block on and wait for the final result.

  • Using CompletableFutures removes the need to write your own code between each stage of concurrent processing.

In the next chapter, we'll harness the power of recursion with ForkJoin! 

Example of certificate of achievement
Example of certificate of achievement