Concurrency in Action: Using Java's CompletableFuture With Work Manager
Learn how to create nonblocking code in Java EE by using the Java functions, CompletableFuture and Work Manager. Read on for more!
Join the DZone community and get the full member experience.
Join For FreeJava took a huge step forward with Java 8. It came up with many revolutionary features such as lambda, CompletableFuture, streams, and so on. CompletableFuture is one of the cool features promoting an asynchronous, event-driven programming model. Many developers tend to use it along with a default thread pool. This practice might be very risky in the case of Java EE applications, as under the hood, CompletableFuture uses a forkjoin thread pool, which is a thread pool shared between all CompletableFutures and all parallel streams. Imagine a full-fledged web application or enterprise application making use of CompletableFuture with a default thread pool. This will simply blow a system as a fork join pool is non-configurable, and hard to monitor and scale. To avoid such issues, CompletableFuture has overloaded APIs, which accept a custom thread pool or executor. This works pretty well for regular applications, but if you deploy applications in a Java EE server, then it’s not the way to go, as you'll need to make use of work managers. A work manager is a managed executor service. Using work managers, the application server can administer the threads and, eventually, resources used by the associated thread pool. Application servers can resize the thread pool based on incoming requests and load, which makes thread pools dynamic in nature. Application servers don’t recognize custom thread/executor services and hence cannot monitor the threads (and resources also) used by an executor service.
In this post, I am going to illustrate how to use CompletableFuture with work managers.
A work manager, on its own, is cumbersome to use and it’s difficult to use CompletableFuture with them. It’s better to use Spring’s WorkManagerTaskExecutor,
which is a wrapper built on the top of work manager that also offers a plethora of other services. WorkManagerTaskExecutor
makes work manager suitable to use with CompletableFuture as an executor service. Let’s see how to do that.
A basic example of CompletableFuture, which uses a fork-join thread pool.
CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> {return "Running completable future";});
CompletableFuture with executor service is used as follows:
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> {return "Running completable future";},exectorService);
For work manager, I am using the commonJ framework (commonj-twm.jar
), which provides APIs to schedule the tasks and to submit the work instances for execution. Spring provides a wrapper on the top of it in the form of WorkManagerTaskExecutor
,which delegates tasks to an underlying target work manager, implementing the commonj.work.WorkManager
interface.
Let’s first discuss how to configure the task executor bean in Spring. commonjTaskExecutor
is defined as a Spring bean of a WorkManagerTaskExecutor
type. The first property workManagerName
is the work manager name, which is a jndi name of the CommonJ work manager. The second property resourceRef
is to indicate whether jndi lookup occurs in a J2EE container.
<bean id="commonjTaskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor" scope="singleton">
<property name="workManagerName" value="wm/cfworkers" />
<property name="resourceRef" value="true"/>
</bean>
In web.xml, we define the work manager by configuring the resource reference name (res-ref-name
) i.e. the jndi of the resource configured in a container, resource type (res-type
) andres-auth
i.e. resource authentication data (application or container).
<resource-ref id="ResRef_5201107264812345748">
<res-ref-name>wm/cfworkers</res-ref-name>
<res-type>commonj.work.WorkManager</res-type>
<res-auth>Application</res-auth>
</resource-ref>
In WebLogic, you can view work managers by navigating to Domain_Name->Work Managers.
In typical Java EE applications, you should configure constraints/policies for the work manager. For our example, I have kept the default configuration.
Now, we are ready to use work manager in the code. For that, just get the work manager bean and use it to execute a runnable task.
TaskExecutor cfExecutor = (TaskExecutor) SpringApplicationContext.getBean("cfTaskExecutor");
Runnable r = () - > {
System.out.println("This is a task");
};
cfExecutor.execute(r);
Then comes the last part, which is using work manager along with CompletableFuture. For this, we need to explore CompletableFuture APIs meticulously. Let's dive deep into CompletableFuture. Important APIs of this class such as supplyAsync, runAsync, and asyncSupplyStage provide overloaded methods to accept custom executors.
By default, ForkJoinPool is used.
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Let’s focus on the supplyAsync
method.
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
Note that Executor is an interface (part of java.util.concurrent package). So, it’s inevitable to have an executor instance which is assignable from Executor in order to use it with the supplyAsync
method. The following code illustrates the creation and retrieval of computation using work manager and CompletableFuture.
TaskExecutor cfExecutor = (TaskExecutor) SpringApplicationContext.getBean("CFTaskExecutor");
public CompletableFuture < String > executeTask() {
TaskExecutor cfExecutor = (TaskExecutor) SpringApplicationContext.getBean("CFTaskExecutor");
return CompletableFuture.supplyAsync(() - > getData(), cfExecutor);
}
private String getData() {
return "event complete";
}
And that’s it. Take a note that intentionally, I have cast bean to TaskExecutor, which implements the Executor interface.
Summary
CompletableFuture
in Java 8 is a giant step forward; it’s a very powerful feature of Java enabling nonblocking code. However, it should be used cautiously in Java EE applications by properly configuring work managers.
You can find my code at my GitHub.
Opinions expressed by DZone contributors are their own.
Comments