Streaming Java CompletableFutures in Completion Order
Want to see what happens when you combine Stream and CompletableFutures in Java?
Join the DZone community and get the full member experience.
Join For FreeJava 8 brought us tools like CompletableFuture
and Stream API… let’s try to combine them both and create a Stream
that returns values from a collection of CompletableFutures
as they arrive.
This approach was also employed when developing 1.0.0 of parallel-collectors.
Streaming CompletableFutures
Essentially, what are trying to do is to implement a solution that would allow us to convert a collection of futures into a stream of values returned by those futures:
Collection<CompletableFuture<T>> -> Stream<T>
In the world of Java, that could be achieved by using, for example, a static method:
public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
// ...
}
To create a custom Stream
, one needs to implement a custom java.util.Spliterator:
final class CompletionOrderSpliterator<T>
implements Spliterator<T> { ... }
And now, we can finish up with the implementation of our static method:
public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
return StreamSupport.stream(
new CompletionOrderSpliterator<>(futures), false);
}
That’s the easy part, let’s implement the CompletionOrderSpliterator
now.
Implementing CompletionOrderSpliterator
To implement our Spliterator
, we’ll need to
fill in the blanks
provide custom implementations of the following methods:
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
// TODO
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
// TODO
}
@Override
public Spliterator<T> trySplit() {
// TODO
}
@Override
public long estimateSize() {
// TODO
}
@Override
public int characteristics() {
// TODO
}
}
Naturally, we need a proper constructor as well.
The most natural way of approaching the problem would involve making a working copy of the source collection, waiting for any future to complete, removing it from the collection and feeding it to the Spliterator
itself.
Waiting for any future to complete can be quickly done using CompletableFuture#anyOf
, and it handles exception propagation correctly out of the box.
However, there’s a slight complication.
If you look at the signature of CompletableFuture#anyOf
, you will see that it’s not very practical because it accepts multiple CompletableFutures<?>
and returns a single CompletableFuture<
Object>
, but this is not the main issue here (just a slight inconvenience).
The real problem is that the CompletableFuture<Object>
returned by the method is not the future that completed first, but a new CompletableFuture
instance that completes when any future completes.
This makes the whole idea of waiting for a future and then removing it from a list of remaining futures a bit complicated. We can’t rely on reference equality, so we can either do a linear scan after each signal from CompletableFuture#anyOf
, or try to come up with something better.
The naive solution could look like:
private T takeNextCompleted() {
anyOf(futureQueue.toArray(new CompletableFuture[0])).join();
CompletableFuture<T> next = null;
for (CompletableFuture<T> future : futureQueue) {
if (future.isDone()) {
next = future;
break;
}
}
futureQueue.remove(next);
return next.join();
}
I’m doing a linear scan and store the index for the sake of constant-time removal. If you want to know why I’m passing 0 to the CompletableFuture[]
, although I know what the size is, check this article.
If you look at the problem from a pragmatic point of view, this should be good enough since no one would ever expect to iterate on a collection of futures that’s more than 10-20 thousand in size (although, it's absolutely possible since CompletableFutures are not bound to threads and millions of them can be completed by a single thread) because of the hardware thread-count limitations (actual number can vary a lot depending on multiple factors, for example, stack size).
However, that might change once Project Loom goes live.
Still, 20000 iterations would result in visiting anything between 20000 nodes optimistically(it’s always the first future that completes) to 200000000 nodes pessimistically.
What could we do about it if we can’t rely on referential equality or hashcodes of CompletableFutures
?
We could assign our ids to them and store them in a map along with matching futures and then make futures identify themselves by returning indexes alongside actual values by returning a pair.
So, let’s store our futures in a map:
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
Now, we could manually assign ids from a monotonically increasing sequence, and make futures return them as well:
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
= new HashMap<>(futures.size(), 1); // presizing the HashMap since we know the capacity and expected collisions count (0)
int seq = 0;
for (CompletableFuture<T> future : futures) {
int index = seq++;
map.put(
index,
future.thenApply(
value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
And now, we can efficiently find and process the next completed future by waiting for it, reading the sequence number and then using it to remove the future from the list of remaining ones:
private T nextCompleted() {
return anyOf(indexedFutures.values()
.toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
Implementation of tryAdvance()
becomes trivial:
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
The hardest part is behind us, now we need to implement three remaining methods:
@Override
public Spliterator<T> trySplit() {
return null; // because splitting is not allowed
}
@Override
public long estimateSize() {
return indexedFutures.size(); // because we know the size
}
@Override
public int characteristics() {
return
SIZED // because we know the size upfront
| IMMUTABLE // because the source can be safely modified
| NONNULL; // because nulls in source are not accepted
}
And here we are.
Working Example
We can quickly validate that it works appropriately by introducing a random processing lag when going through a sorted sequence:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<Integer>> futures = Stream
.iterate(0, i -> i + 1)
.limit(100)
.map(i -> CompletableFuture.supplyAsync(
withRandomDelay(i), executorService))
.collect(Collectors.toList());
completionOrder(futures)
.forEach(System.out::println);
}
private static Supplier<Integer> withRandomDelay(Integer i) {
return () -> {
try {
Thread.sleep(ThreadLocalRandom.current()
.nextInt(10000));
} catch (InterruptedException e) {
// ignore shamelessly, don't do this on production
}
return i;
};
}
And you can see that values get returned not in the original order:
6
5
2
4
1
11
8
12
3
Streaming Futures in Original Order
What if we want different semantics and simply maintain the original order?
Luckily, that can be achieved quickly without any particular infrastructure:
public static <T> Stream<T> originalOrder(
Collection<CompletableFuture<T>> futures) {
return futures.stream().map(CompletableFuture::join);
}
A Complete Example
package com.pivovarit.collectors;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static java.util.concurrent.CompletableFuture.anyOf;
/**
* @author Grzegorz Piwowarek
*/
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
indexedFutures = toIndexedFutures(futures);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
private T nextCompleted() {
return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return indexedFutures.size();
}
@Override
public int characteristics() {
return SIZED | IMMUTABLE | NONNULL;
}
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);
int counter = 0;
for (CompletableFuture<T> f : futures) {
int index = counter++;
map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
}
The complete working example can be found on GitHub as well.
Do you have an idea of how to make it better? Don’t hesitate, and let me know!
Published at DZone with permission of Grzegorz Piwowarek, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments