Think Twice Before Using Java 8 Parallel Streams
Parallelization was the main driving force behind lambdas, stream API, and others. Let's take a look at an example of stream API.
Join the DZone community and get the full member experience.
Join For FreeIf you listen to people from Oracle talking about design choices behind Java 8, you will often hear that parallelism was the main motivation. Parallelization was the driving force behind lambdas, stream APIs, and others. Let's take a look at an example of stream API.
private long countPrimes(int max) {
return range(1, max).parallel().filter(this::isPrime).count();
}
private boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
Here, we have the method countPrimes
that counts the number of prime numbers between 1 and our max. A stream of numbers is created by a range method. The stream is then switched to parallel mode; numbers that are not primes are filtered out and the remaining numbers are counted.
You can see that the stream API allows us to describe the problem in a neat and compact way. Moreover, parallelization is just a matter of calling the parallel()
method. When we do that, the stream is split into multiple chunks, with each chunk processed independently and with the result summarized at the end. Since our implementation of the isPrime
method is extremely ineffective and CPU intensive, we can take advantage of parallelization and utilize all available CPU cores.
Let's take a look at another example:
private List<StockInfo> getStockInfo(Stream<String> symbols) {
return symbols.parallel()
.map(this::getStockInfo) //slow network operation
.collect(toList());
}
We have a list of stock symbols on the input and we have to call a slow networking operation to get some details about the stock. Here, we do not deal with a CPU-intensive operation, but we can take advantage of parallelization too. It's a good idea to execute multiple network request in parallel. Again, a nice task for parallel streams, do you agree?
If you do, please look at the previous example again. There is a big error. Do you see it? The problem is that all parallel streams use common fork-join thread pool, and if you submit a long-running task, you effectively block all threads in the pool. Consequently, you block all other tasks that are using parallel streams. Imagine a servlet environment, when one request calls getStockInfo()
and another one countPrimes()
. One will block the other one even though each of them requires different resources. What's worse, you can not specify thread pool for parallel streams; the whole class loader has to use the same one.
Let's illustrate it in the following example:
private void run() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); // Simulating multiple threads in the system // if one of them is executing a long-running task. // Some of the other threads/tasks are waiting // for it to finish es.execute(() -> countPrimes(MAX, 1000)); //incorrect task es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS);}private void countPrimes(int max, int delay) { System.out.println( range(1, max).parallel() .filter(this::isPrime).peek(i -> sleep(delay)).count() );}
Here, we simulate six threads in the system. All of them are performing CPU intensive task, the first one is “broken” and sleeps for a second just after it founds a prime number. This is just an artificial example; you can imagine a thread that is stuck or performs a blocking operation instead.
The question is: What will happen when we execute this code? We have six tasks; one of them will take the whole day to complete, the rest should finish much sooner. Not surprisingly, every time you execute the code, you get a different result. Sometimes, all healthy tasks finish; other times, a few of them are stuck behind the slow one. Do you want to have such behavior in the production system? With one broken task taking down the rest of the application? I guess not.
There are only two options on how to make sure that such a thing will never happen. The first is to ensure that all tasks submitted to the common fork-join pool will not get stuck and finish in a reasonable time. But that's easier said than done, especially in complex applications. The other option is to not use parallel streams and wait until Oracle allows us to specify the thread pool to be used for parallel streams.
Resources:
Originally published on 8/17/15
If you enjoyed this article and want to learn more about Java Streams, check out this collection of tutorials and articles on all things Java Streams.
Opinions expressed by DZone contributors are their own.
Comments