Concurrency: Java Futures and Kotlin Coroutines

Java as a language makes concurrent code look unpleasant. Maybe Kotlin can help. Yet another, although experimental, step would be to use coroutines instead of Futures.

By  · Tutorial
Comment
Save
16.4K Views

A long time ago, one had to manually start new threads when wanting to run code concurrently in Java. Not only was this hard to write, it was easy to introduce bugs that were hard to find. Testing, reading and maintaining such code was no walk in the park either. Since that time, and with a little incentive coming from multi-core machines, the Java API has evolved to make developing concurrent code easier. Meanwhile, alternative JMV languages also have their opinion about helping developers write such code. In this post, I’ll compare how it’s implemented in Java and Kotlin.

To keep the post focused, I deliberately left out performance to write about code readability.

About the Use Case

The use case is not very original. We need to call different web services. The naive solution would be to call them sequentially, one after the other, and collect the result of each of them. In that case, the overall call time would be the sum of the call time of each service. An easy improvement is to call them in parallel, and wait for the last one to finish. Thus, performance improves from linear to constant — or for the more mathematically inclined, from o(n) to o(1).

To simulate calling a web service with a delay, let’s use the following code (in Kotlin because this is so much less verbose):

class DummyService(private val name: String) {

    private val random = SecureRandom()

    val content: ContentDuration
        get() {
            val duration = random.nextInt(5000)
            Thread.sleep(duration.toLong())
            return ContentDuration(name, duration)
        }
}

data class ContentDuration(val content: String, val duration: Int)

The Java Future API

Java offers a whole class hierarchy to handle concurrent calls. It’s based on the following classes:

Callable: A Callable is a "task that returns a result". From another view point, it's similar to a function that takes no parameter and returns this result.

Future: A Future is "the result of an asynchronous computation". Also, "The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready". In other words, it represents a wrapper around a value, where this value is the outcome of a calculation.

Executor Service: An ExecutorService "provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks". It is the entry point into concurrent handling code in Java. Implementations of this interface — as well are more specialized ones, can be obtained through static methods in the Executors class.

This is summarized in the following class diagram:

Calling our services using the concurrent package is a 2-steps process.

Creating a Collection of Callables

First, there needs to be a collection of Callable to pass to the executor service. This is how it might go:

  1. From a stream of service names
  2. For each service name, create a new dummy service initialized with the string
  3. For every service, return the service’s getContent()method reference as a Callable. This works because the method signature, matches Callable.call() and Callable is a functional interface.

This is the preparation phase. It translates into the following code:

List<Callable<ContentDuration>> callables = Stream.of("Service A", "Service B", "Service C")
      .map(DummyService::new)
      .map(service -> (Callable<ContentDuration>) service::getContent)
      .collect(Collectors.toList());

Processing the Callables

Once the list has been prepared, it’s time for the ExecutorService to process it aka the “real work”.

  1. Create a new executor service — any will do
  2. Pass the list of Callable to the executor service, and stream the resulting list of Future
  3. For every future,
  4. Either return the result
  5. Or handle the exception

The following snippet is a possible implementation:

ExecutorService executor = Executors.newWorkStealingPool();
List<ContentDuration> results = executor.invokeAll(callables).stream()
    .map(future -> {
         try { return future.get(); } 
         catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }
     }).collect(Collectors.toList());

The Future API, but in Kotlin

Let’s face it, while Java makes it possible to write concurrent code, reading and maintaining it is not that easy, mainly due to:

  • Going back and forth between collections and streams
  • Handling checked exception in lambdas
  • Casting explicitly

Just porting the above code to Kotlin removes those limitations and makes it more straightforward:

var callables: List<Callable<ContentDuration>> = arrayOf("Service A", "Service B", "Service C")
    .map { DummyService(it) }
    .map { Callable<ContentDuration> { it.content } }

val executor = Executors.newWorkStealingPool()
val results = executor.invokeAll(callables).map { it.get() }

Kotlin Coroutines

With version 1.1 of Kotlin comes a new experimental feature called coroutines.

Basically, coroutines are computations that can be suspended without blocking a thread. Blocking threads is often expensive, especially under high load […]. Coroutine suspension is almost free, on the other hand. No context switch or any other involvement of the OS is required.

The leading design principle behind coroutines is that they must feel like sequential code but run like concurrent code. The are based on the following class diagram:

Nothing beats the code itself though. Let’s implement the same as above, but with coroutines in Kotlin instead of Java futures.

As a pre-step, let’s just extend the service to ease further processing by adding a new computed property wrapped around content, of type Deferred:

val DummyService.asyncContent: Deferred<ContentDuration>
    get() = async(CommonPool) { content }


This is standard Kotlin extension property code, but notice the CommonPool parameter. This is the magic that makes the code run concurrent. It’s a companion object (i.e. a singleton) that uses a multi-fallback algorithm to get an ExecutorService instance.

Now, onto the code flow proper:

  1. Coroutines are handled inside a block. Declare a variable list outside the block to be assigned inside it.
  2. Open the synchronization block.
  3. Create the array of service names.
  4. For each name, create a service and return it.
  5. For each service, get its async content (declared above) and return it.
  6. For each deferred, get the result and return it.
// Variable must be initialized or the compiler complains
// And the variable cannot be used afterwards
var results: List<ContentDuration>? = null
runBlocking {
    results = arrayOf("Service A", "Service B", "Service C")
        .map { DummyService(it) }
        .map { it.asyncContent }
        .map { it.await() }
}

Takeaways

The Future API is not so much a problem than the Java language itself is. As soon as the code is translated into Kotlin, readability improves a lot. Yet, having to create a collection to pass to the executor service breaks the nice functional pipeline.

For coroutines, the only compromise is to move from a var to a val to get the final results (or to add the results to a mutable list). Also, remember that coroutines are still experimental. Despite all of that, code does look sequential — and is thus more readable, and behaves parallel.

The complete source code for this post can be found on GitHub in Maven format.

To Go Further

Published at DZone with permission of Nicolas Fränkel, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.


Comments