How Structured Concurrency in Java 19 Helps to Make Programming of Concurrent Applications Easier

Info

This post is part of a blog post series titled 6 New Features in Java 19 That Make Your Life Easier.

Note

This feature is in incubator phase. In order to use it, compile the program with javac --release 19 --enable-preview --add-modules jdk.incubator.vector Main.java and run it with java --enable-preview --add-modules jdk.incubator.vector Main. More details about using incubator modules in your project can be found here.

The goal of this extension is to simplify the maintainability, reliability, and monitoring of concurrent code. For this purpose, new abstractions like “Unit of Work” have been introduced, which can manage multiple threads under one hat. This allows, for example, to implement error handling at the level of a “Unit of Work” instead of separately for each thread.

Since Java 5, we have the java.util.concurrent.ExecutorService API, which allows us to execute subtasks concurrently. Below is an example of a handle() method that takes a request and executes two subtasks via ExecutorService.

Response handle() throws ExecutionException, InterruptedException {
    Future<String> user = executorService.submit(() -> findUser());
    Future<String> order = executorService.submit(() -> fetchOrder());
    String theUser = user.get();
    String theOrder = order.get();
    return new Response(theUser, theOrder);
}

The problem with this code is that the two subtasks are in a task-subtask relationship. The main request (or task) is only considered successfully processed if both subtasks have completed successfully as well. The code in the handle() as well as both subtasks could fail. Normally, an exception is thrown in such a case. In the example above, this poses certain risks for thread leaks or unnecessary waste of resources at best. It is the developers’ responsibility to make sure that all involved threads are terminated properly.

The new Structured Concurrency API allows us to write the code above in such a way that all threads are properly terminated in the event of failure. The new code then looks like this:

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> user = scope.fork(() -> findUser());
        Future<String> order = scope.fork(() -> fetchOrder());
        scope.join();
        scope.throwIfFailed();
        return new Response(user.resultNow(), order.resultNow());
    }
} 

The Structured Concurrency API follows the principle: “If a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block”. The subtasks are now executed in the context of the task. The task waits for the results of the subtasks and monitors them for errors. In addition, the task-subtask relationship is now clearly visible.

The primary class of the Structured Concurrency API is StructuredTaskScope. This allows a developer to define a task as a group of concurrent subtasks. These are then treated as a single unit, called a “Unit of Work”. The results of the subtasks or the exceptions thrown are handled by the parent task. Forking, joining, canceling, handling errors, and composing results all take place in a clearly defined lexical scope, the StructuredTaskScope. The scope is defined by the try-with-resources statement.

The StructuredTaskScope class offers a few valuable advantages:

  • Error Handling: If one of the subtasks fails, the other subtask is automatically terminated if it has not yet completed. In the above example, this is handled by the ShutdownOnFailure policy (there are others as well)
  • Cancellation Propagation: If the handle() task fails before or during the join() call, both subtasks are automatically canceled when the main thread goes out of scope.
  • Clarity: The code in the above example has a clear structure: Subtasks are defined, it waits for their result and then decides if the task should return the results or if the task should terminate with an error.
  • Observability: Thread dumps show the hierarchy of such task-subtask relationships.

The above example is a so-called “fan-out” scenario, where several subtasks are initiated based on one request. StructuredTaskScope can also be used for “fan-in” scenarios, where multiple requests are combined into a “Unit of Work”. The following example illustrates this:

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept();
                scope.fork(() -> handle(socket));
            }
        } finally {
            // If there's been an error or we're interrupted, we stop accepting
            scope.shutdown();  // Close all active connections
            scope.join();
        }
    }
}

If you were to create a thread dump in such a scenario, all subtasks responsible for the connections would be displayed below the task.

References

Leave a Comment

Your email address will not be published. Required fields are marked *