Handling Virtual Threads
Learn different strategies to implement virtual threads, a framework that allows us to dramatically facilitate programming a thread-per-task model. This simplifies writing and maintaining high-throughput concurrent applications.
Join the DZone community and get the full member experience.
Join For FreeVirtual threads were released as a preview feature in September 2022 as a part of Java 19. They are lightweight versions of platform threads. Unlike legacy platform threads, the memory footprint of a virtual thread is minute. For an in-depth introduction to virtual threads, check out the following article.
Virtual threads support the creation of a thread per unit of work model, no matter how many tasks we have to process. Instead of reusing legacy platform threads via thread pools, we can have a virtual thread for each task. Following the programming flow for each task on a per-thread basis simplifies the coding and eases the maintenance. In particular, virtual threads shine with I/O requests to support concurrent high-throughput I/O programming.
CPU-bound calculations get optimal CPU utilization with platform threads and thread pools. We can perform CPU-intensive tasks and utilize the thread pool to support parallelization, allowing us to use the CPU multi-core architecture to maximize the speed and resource utilization.
So we have at our disposal virtual threads to support I/O bound requests and thread pools with their associated platform threads to work with CPU-bound tasks. In this article, we will provide different strategies that we can follow when we have requests for units of work that require both I/O tasks and resource-expensive CPU-intensive calculations. We will explore some of the options at our disposal and evaluate the tradeoffs for the different programming choices.
Problem Overview
Let's consider a simple example of a mission-critical application that monitors different machinery in power plants and quickly determines if they are functioning correctly. We assume that the machines have sensors connected to the network that provide reading on temperature, pressure, and so on. The gist of the problem is that we have to connect to these devices, get their data readings, and quickly do a computationally intense data analysis to report back on the system's state.
Suppose we have a batch process that is continuously running to check the state of the different machines. To simplify the problem, let's assume that the process reads data continuously from a file where it gets the locations and the time intervals it has to monitor. A sample of the data can look like this:
url | id | startTime | endTime |
https://pw1/section/sensors | 756a9c | 12/21/2022 11:00 | 12/21/2022 12:00 |
https://pw1/section2/sensors | 647d5m | 12/21/2022 11:00 | 12/21/2022 12:00 |
https://pw2/section/sensors | 948k4l | 12/21/2022 8:00 | 12/21/2022 12:00 |
https://pw2/section2/sensors | 938r8c | 12/21/2022 9:00 | 12/21/2022 12:00 |
This example showcases a dramatically simplified generic type of problem, but without much effort, it can be easily generalized. Our task is to show how we would write this batch process using virtual threads and showcase the new ways that we will programmatically tackle this problem and its advantages over the legacy methodology.
Programming With Virtual Threads
To begin our batch process, we must read the entries of the table and place them in a list of entries (List<InputEntry>
), where InputEntry
is defined as a java record:
record InputEntry(String url, String id, String startTime, String endTime) {}
Next, we have to go over the list and analyze the sensor reading for each machine:
public static void processData(List<InputEntry> inputEntries) {
System.out.println("processSensorReadings()");
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
CompletionService<String> cService = new ExecutorCompletionService<>(executor);
for (InputEntry inputEntry : inputEntries) {
cService.submit(() -> processSensorData(inputEntry));
}
int processed = 0;
while (processed < inputEntries.size()) {
processed++;
try {
Future<String> resultFuture = cService.take();
System.out.println("Handle status:" + resultFuture.get());
} catch (ExecutionException | InterruptedException e) {
System.out.println("Failed to process:" + e.getMessage());
}
}
}
In line 3, we use a new executor that starts a virtual thread for each task.
In line 4, we wrap this executor in a CompletionService
, which allows us to take the tasks run by the virtual threads in the order that they complete as can be seen in lines 10-18.
In line 6, we submit the processSensorData()
to the CompletionService
which, as we will see, is going to do I/O request plus possibly data analysis.
Since virtual threads are a preview feature, we need to run Java 19 with the --enable-preview
VM argument.
At this stage, we notice that for each InputEntry
, we are creating a virtual thread. This process contrasts with how we would have programmed with legacy threads, using thread pools to reuse this expensive resource. The advantage of this approach is evident, with a virtual thread per unit of work. It facilitates the code logic, plus it's much easier to maintain or troubleshoot problems — for example, through thread dumps or stack traces.
Fetching and Processing the Data
We need to get the sensor data to analyze it, so we do the following:
public static String processSensorData(InputEntry inputEntry) throws IOException, InterruptedException {
DoubleStream data = fetchSensorData(inputEntry);
return "ID: " + inputEntry.id() + ": " + analyzeSensorData(data);
}
private static DoubleStream fetchSensorData(InputEntry inputEntry) throws MalformedURLException, InterruptedException {
URL pwUrl = new URL(inputEntry.url() + "/startTime/endTime");
// In a real application open a secure url stream and fetch the data
// For this example we return some random data and simulate network latencies
Thread.sleep((long) (Math.random() * 100));
DoubleStream data = DoubleStream.generate(() -> new Random().nextDouble()).limit(100);
return data;
}
In this simple example, we only fetch data from one sensor, but in a more realistic application, we might need to process several sensors with multiple I/O operations on each virtual thread.
At this point, the process is clear — our virtual threads are fetching the data and running in parallel. The question we have next is how we are going to analyze the data. Let's look at the different strategies we can follow.
Analyzing the Data
We assume that the data analysis we have to perform is CPU-intensive. Ideally, we would like to perform this calculation in the same thread to maintain the thread per unit of work paradigm. But, if the analysis is time sensitive, there might be better approaches. We know that for a CPU-bound workload, it is best to use thread pools to get optimal performance.
In this case, it might be best that the virtual thread delegates the work to a thread pool:
public static int analyzeSensorData(DoubleStream data) {
double resultMean = 0;
// Uses ForkJoinPool to improve performance
// In real application do proper data analysiys
resultMean = data.parallel().average().getAsDouble();
return determineStatusCode(resultMean);
}
private static int determineStatusCode(double result) {
// In a real application determine code based on result parameters
if (result > 0.49) {
return 719; // Made up error code
} else {
return 0;
}
}
For this example, we use Java Streams, which, under the covers, utilizes the ForkJoinPool
to do the calculation.
In line 6, we signal the Stream framework to parallelize the calculation. We find the average, but the analysis will be much more involved in an actual application.
We use the common ForkJoinPool
provided by Java, but we will define our own in a more realistic scenario. We will also have to determine the pool size to maximize CPU utilization. For the common pool, you can set the system property:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
where we assume that 10 is the number of concurrent threads that optimizes our system.
We can see that, following this path, we optimized the data analysis processing by invoking the thread pool from the virtual thread. As the virtual threads complete their I/O tasks, they will utilize the Java Stream platform to perform (or queue) the calculations in the ForkJoinPool
.
This is an example; we could have used other thread pools to do the calculations. In the provided sample code, we show further examples.
Alternatively, if we need to throttle the utilization of a scarce resource by the virtual threads, we can use semaphores, as we show in the following code:
public static Semaphore semaphoreService = new Semaphore(NUM_PERMITS);
public static String performScarceResourceRequest() throws InterruptedException, IOException {
semaphoreService.acquire();
//perform expensive request
// For this example we return some random data and simulate network latencies
Thread.sleep((long) Math.random());
semaphoreService.release();
return "Requested data";
}
We fine-tune the number of NUM_PERMITS
that we set in the semaphore in line 1 to optimize the access to the resource. The virtual thread has to acquire a permit from the semaphore to be able to run.
Finally, we cover the cases where the virtual thread performs a task that requires other sub-tasks to be completed. The Java 19 release also has a framework that supports structured concurrency. This project is at an incubator stage. It provides powerful support to run related tasks with virtual threads. We will cover a simple case where we have to perform some validations before we analyze the data from the sensors:
public static int validateAndAnalyzeSensorData(DoubleStream data) throws IOException, InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> validatedData = scope.fork(() -> validateData(data));
Future<String> checkedEnvironment = scope.fork(() -> checkEnvironment(data));
scope.join();
scope.throwIfFailed(e -> new IOException(e));
if (validatedData.get().equals("ok") && checkedEnvironment.get().equals("ok"))
return analyzeSensorData(data);
else
return -1;
}
}
public static String validateData(DoubleStream data) throws IOException, InterruptedException {
// In a real application open a secure url stream and fetch the data
Thread.sleep((long) Math.random());
return "ok";
}
public static String checkEnvironment(DoubleStream data) throws IOException, InterruptedException {
// In a real application open a secure url stream and fetch the data
Thread.sleep((long) Math.random());
return "ok";
}
In line 2, we use StructuredTaskScope
to coordinate running the tasks concurrently. StructuredTaskScope
supports try-with-resources statement to ensure that the resource closes at the end of the code's try
block.
We run the validation in lines 3 and 4 and check the environment invoking scope.fork
() that takes as a parameter a Callable
, spawns a new thread, and returns a Future
. The parent task scope's ThreadFactory
creates new threads, so we'll have virtual threads.
In line 4, we join the two tasks, and line 6 invokes scope.throwIfFailed()
to propagate any errors. Finally, in line 7, we know that the two tasks have successfully been completed, so we can proceed.
Since structure concurrency is in incubator mode, we need to run the VM with the argument:--add-modules=jdk.incubator.concurrent
.
The alternative approach would be to do the two sub-tasks serially in the parent's virtual thread. This might be desirable if tasks are intimately related and we want to keep a thread per unit of work paradigm. However, this will not be as performant as the previous approach.
Conclusions
Virtual threads promise to revolutionize how we accomplish concurrent I/O programming. This tutorial has covered different approaches to tackling thread per unit of work when we have tasks that need to support I/O and CPU-bound workloads.
You can find the source code used in this article on GitHub.
Opinions expressed by DZone contributors are their own.
Comments