• 12 hours
  • Hard

Free online content available in this course.

course.header.alt.is_video

course.header.alt.is_certifying

Got it!

Last updated on 11/8/22

Write Concurrent Applications Using Thread Pools and Futures

Reusing Threads With Thread Pools

Have you ever been to a swimming pool and had to wait while they constructed a new personalized swim-lane for you? Probably not! It would be tedious and make swimming a lot less appealing.

Surprisingly, this is just what many poorly written multi-threaded programs attempt to do! They may not be building swim-lanes out of concrete, but they are building lanes, or rather new threads, for Java statements to swim along. Creating a thread is slow. So if you have to keep creating threads for each task, the total time spent creating new ones can defeat the benefits of concurrency.  😞

How can that be? Thread creation looks fast!

Each time you create a thread, the JVM must perform a number of slow operations, including:

  • Creating new versions of the components required to execute a Java program including call stacks, memory for local variables, and more.

  • Creating a native thread, which is a version of your thread which your operating system has to manage and run. It is another slow operation as it reaches outside of your JVM.

It doesn't end there. Once you start a thread, it has to be continuously monitored by both Java's scheduler and the operating systems, slowing down your JVM and every other application.

Fortunately, it's not of a huge difference in time. At firstBenchmarks have shown that the difference in time between creating a hundred thousand threads and reusing previously created ones is vast. It can be nearly eight times slower than just reusing one you created previously. This difference goes up as you introduce threads and cores on your CPU.

Performance optimization is the art of making the software run at acceptable speeds for the business, rather than speeding things up unnecessarily. The problems that slow down an application to unacceptable levels are usually the culmination of lots of little not so bad levels of performance.

It's called death by a thousand cuts! Each slow behavior might be dismissed by itself, but together they add up into delays that reduce the efficiency you were seeking by coding concurrently. Don't create new threads for each new task! 

How do I avoid creating a new thread for each task?

How do swimming pools designers get around the insanity of building a lane for every new swimmer? Well, they have a pre-defined number of swim lanes that they construct once. People line up then take turns swimming in those lanes.

You can make your software more performant by doing the same thing using thread pools.The thread pool is a design pattern used extensively in concurrent programming as a way to avoid creating a new thread each time you want to run a new task. It is essentially a collection of Threads that you create once and reuse.
Rather than a newly created thread for a specific task with specific values, you design threads which, once created, are generic enough to run another task. The good thing is you don't need to engineer this! Once again, the concurrency framework does it for you.

Java concurrency framework's ExecutorService is a class which holds onto its own thread pool and runs your tasks against them. This way, you don't need to call  new Thread()  multiple times, but instead, submit a job to the ExecutorService using its submit method.

There are three simple steps involved in creating a thread pool and running a job against it.

  1. Create an ExecutorService with its own thread pool.

  2. Submit Runnable or Callable tasks to that service.

  3. Fetch the result of your task, if there is one. 

Let's find out to do this!

Creating a Thread Pool With ExecutorService

When you go to a cloakroom and drop off your jacket, you usually get a ticket that you hold on to and present when you return to the cloakroom. If the attendant behind the counter happens to have your coat at hand, you'll get it back straight away. If not, the attendant goes looking while you wait.

The ExecutorService is similar to that cloakroom. When you submit a task to it, it gives you back a special ticket - except this one is an instance of a class called Future. It has a get() method, just like the FutureTask you saw previously.

get() acts like a person lining up and waiting for a coat. Your warm cozy return value, or coat, is returned by one of many cloakroom attendants or threads in your pool. Unlike the cloakroom, the threads start working on your task soon after the moment you submit it.

It's time to introduce you to some of these new classes. Let's create a thread pool to do our bidding. Startup  jshell  and follow along!

Step 1: Create a thread pool using the Executors helper class.

jshell> ExecutorService executorService = Executors.newFixedThreadPool(2);

This calls the newFixedThreadPool() method and requests a pool of two threads. What you get back is an instance of ExecutorService, which contains those two threads.

Step 2: Submit multiple Runnable or Callable instances to the pool. To run a task on the pool, pass it to the ExecutorService's submit() method. First, use a Runnable lambda to print "Hello, Openclassrooms Student!" Remember that since it doesn't return a result, it will be interpreted as a Runnable!

jshell> executorService.submit( () -> System.out.println("Hello, Openclassrooms Student!") )
Hello, Openclassrooms Student!

Use a Callable and get a Future back, which is similar to the FutureTask you saw before:

jshell> Future future = executorService.submit( () -> "Returning (Hello, Openclassrooms Student!)" )

Just like FutureTask, you can call get() on it to see the result returned by the Callable lambda:

jshell> future.get()
$14 ==> "Returning (Hello, Openclassrooms Student!)"

As you can see, calling get() returns the return value of the lambda. Did you see how you could just submit your task to the pool and let it run in its own time?

Try it out for yourself and create some tasks which:

Simply modify the lambda used above!

Using Futures with ExecutorService

As you've seen, the ExecutorService returns an instance of a Future. But what exactly is a Future in Java? Unlike a cloakroom ticket, a future is an object modeling an asynchronous task which will eventually complete, with or without a result. It has methods that allow you to check if the task has completed, to wait for it to complete, and to retrieve its final result.

Let's look at some of its more useful methods. Fire up, JShell again! This time, we're going to see some of the methods provided by our Future.

Step 1: Create a thread pool using the Executors helper class. Since you're only concerned with inspecting a Future, you only need a single thread. Executors has a method for that!

jshell> ExecutorService executorService = Executors.newSingleThreadExecutor();

This calls the newSingleThreadExecutor() method and requests a pool of one thread.

Step 2: Create a Future by submitting a task.

jshell> Future<String> greeting = executorService.submit(()->"Hi, Again")

This creates a Future of type string.

Step 3: By calling isDone(), you can check if the lambda has run and completed.

jshell> greeting.isDone()
$3 ==> true

This shows us that the lambda has completed. You can fetch the result yourself by calling the  get()  method.

Step 4: Submit another lambda to the pool that never ends. It's easy to do. Let's find out how to deal with such a beast!

jshell> Future looper = executorService.submit(()->{while (true) {} })

We've created a Runnable with a while loop that never ends.

while (true) {
    
}

Step 5: Let's check if the Future is running using isDone()

jshell> looper.isDone()
$18 ==> false

 Clearly it is not done yet because it returned a false. 

Step 6: Check if anything has cancelled the Future! Check this with isCancelled()

jshell> looper.isCancelled()
$19 ==> false

Well, nothing should have cancelled it, since it was just created. 😇

Step 7: Are you ready to cancel the Future? Let's do so with cancel()

jshell> looper.cancel(true)
$20 ==> true

Try calling  looper.isCancelled()  again and see if you've cancelled that Future. You should see it returning true this time.

Step 8: Always close down your ExecutorService when you're done with it as not doing so might leave your application running.

jshell> executorService.shutdown()

How do I handle an error which gets thrown by my Future?

To handle an exception thrown by your Future, watch for ExecutionExceptions thrown when calling  get(). If your Future encounters an exception which isn't checked, this will bubble up and get thrown in an ExecutionException. You can get back the original exception by calling  getCause(). For instance, imagine that you had a Future which threw a RuntimeException:

Future f1 = es.submit( () -> {throw new RuntimeException("Boom");})

To catch this in code, you would have to place a try/catch around your call to .get().

try {
    f1.get();
} catch (ExecutionException e) {
    e.printStackTrace();
}

At Line 3, you can see that we catch an ExecutionException and then call  e.printStackTrace()  on it to see the exception. Try this out in JShell after creating a new ExecutorService to submit the work. Here's what it looks like when you run this in JShell:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Boom
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
...
Caused by: java.lang.RuntimeException: Boom
        at REPL.$JShell$24.lambda$do_it$$0(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
...
        at java.base/java.lang.Thread.run(Thread.java:834)

Have a look at Line 5! You can see that the ExecutionException was caused by a RuntimeException.

Managing Your Larger Thread Pool: Practice!

Imagine that you're organizing a party. To do this, you'll need a feel for the number of people attending. If you're inviting five people, you might get away with using your living room. What happens if 300 of your Facebook friends RSVP? How will you cope if all 300 show up? You might start looking for a venue that has enough space to accommodate that number of people, but since there are often no-shows, this could result in spending money you don't need to. It might not be a bad idea to have space for 200 and some outdoor space for overflow.

When using a thread pool within your application, you sometimes want a similar degree of control. The executors class has multiple methods to help you create an ExecutorsService with different types of thread pools. Here are a few of the more commonly used ones:

Method

What does it do?

When do I use it?

newFixedThreadPool(int n)

As we saw above. This creates threads and makes sure that we always have n threads active and ready to work. If they are all busy, other work gets queued up and will wait for available threads.

This is great when you know that your threads will be constantly processing data and all need to keep busy. Since creating threads is slow, this makes sure that you only do so once.

See this article to learn more.

newCachedThreadPool()

These reuse previously constructed threads if any are available. Otherwise, a new thread gets added to the pool. Threads which are not used for a minute are removed from the cache, keeping your application fast and using less memory. Creating one of these pools will not create a thread until you submit a task.

These are great for short tasks and can result in improving the performance of your application if it's small.

See this good article to learn more.

 newSingleThreadExecutor()

This creates a thread pool with a single thread. Any tasks executed on that thread get lined up and will be executed in the order they were submitted. This is essentially sequential execution on a new thread. 

This is good for small programs with small tasks, where you are concerned about the unpredictability of multiple threads doing the same job. It limits the benefits of concurrency.

See this article to learn more.

It doesn't just stop here, however. Java provides the ThreadPoolExecutor; a class used to create those thread pools described above. ThreadPoolExecutor allows you to provide even more specificity by describing things like the minimum and maximum size of your pool. It also allows you to set timeouts so you can terminate tasks that take too long to execute.

However, it is generally preferable to use Executors static methods directly.

There are several steps to follow for using Futures to calculate the planetary averages, applying the same inputs as the previous raw threads example.

Step 1:  First, create a fixed size thread pool using Executors.

     // 1. ThreadPool
    ExecutorService executorService = Executors.newFixedThreadPool(4);

We've created a pool of 4, as we have four futures to run.

Step 2: Next, create the four futures by submitting them to the ExecutorService.

    // 2. Submit Each Future for summing
    Future<Double> futureOfSumOne =
        executorService.submit(
            ()-> fileAnalyzer.sumFile(fileOnePath));
            
    Future<Double> futureOfSumTwo =
        executorService.submit(
            ()-> fileAnalyzer.sumFile(fileTwoPath));

    // 3. Submit Each Future for counting
    Future<Double> futureOfCountOne =
        executorService.submit(
            ()-> fileAnalyzer.countDoubleRows(fileOnePath));
            
    Future<Double> futureOfCountTwo =
        executorService.submit(
            ()-> fileAnalyzer.countDoubleRows(fileTwoPath));

Each submits a lambda such as  ()->fileAnalyzer.sumFile(fileOnePath)  to the ExecutorService. 

Step 3: Then call get() on the Futures to get back the values to use in calculating the average.

    // 4. Wait for Futures
    Double valueOfFileOneSum = futureOfSumOne.get();
    Double valueOfFileTwoSum = futureOfSumTwo.get();
    Double valueOfFileOneCount = futureOfCountOne.get();
    Double valueOfFileTwoCount = futureOfCountTwo.get();

Then use these values to create the average in the parent thread.

Let me show you the code:

Running the Benchmark!

Go back to your previously cloned project and checkout the branch p1-c4-executor-service-and-futures:

git checkout p1-c4-executor-service-and-futures

As before, you can run the benchmarks by using the Gradle task runBenchmarks:

./gradlew runBenchmarks

How do your local benchmarks compare with the ones I showed you in the video? Every computer you run it on will have its own behavior!

For an additional challenge, create your own branch off the p1-c4-executor-service-and-futures branch, and try to calculate an average happen in a Future. Modify the following lines in  com.openclassrooms.concurrency.planetbrain.futures.app.FutureBasedPlanetAnalyzerApp  :


public static Double getAverageOfTemperatureFiles(String[] files) throws ExecutionException, InterruptedException, URISyntaxException {
 ....
 ....

    // 5. Add sums
    Double sumOfTemperatures = valueOfFileOneSum + valueOfFileTwoSum;

    // 6. Add counts
    Double countOfTemperatures = valueOfFileOneCount + valueOfFileTwoCount;

    // 7. Calculate average
    return sumOfTemperatures/countOfTemperatures;
}

Checkout the branch p1-c4-activity to see the solution in the top-most commit.

git checkout p1-c4-activity

Use  git show  or your idea to see the solution. As long as you created a Future which you could wait on with  get(), you're doing OK. 🙌

Let's Recap!

  • Thread pools are an extensive collection of threads that are pre-created and stand at the ready to run your tasks. They help make programs faster and leaner by saving memory and time.

  • Java's ExecutorsService is a class that encapsulates a thread pool. It provides a submit() method into which you can pass a Runnable OR Callable.

    • When you submit() a task, it is given to the first available thread in the pool.

    • When you submit a task to an ExecutorService, you are given a Future. This has a .get() method which, if called, blocks until a result is ready.

    • Executors.newFixedThreadPool(int n) is used to create a thread pool with n threads always at the ready. Threads are reused and submitted work lines up and waits for an available thread.

      Executors.newCachedThreadPool() is used to create a thread pool that creates threads as you need them. It destroys threads that have not been used for a minute. It is suitable for applications that don't need to be at the ready to run Futures continuously.

 In the next chapter, you'll see how to avoid issues with threads using atomic variables! 

Example of certificate of achievement
Example of certificate of achievement