What's Wrong in Java 8, Part III: Streams and Parallel Streams
Join the DZone community and get the full member experience.
Join For FreeWhen the first early access versions of Java 8 were made available, what seemed the most important (r)evolution were lambdas. This is now changing and many developers seem to think now that streams are the most valuable Java 8 feature. And this is because they believe that by changing a single word in their programs (replacing stream
with parallelStream
) they will make these programs work in parallel. Many Java 8 evangelists have demonstrated amazing examples of this. Is there something wrong with this? No. Not something. Many things:
- Running in parallel may or may not be a benefit. It depends what you are using this feature for.
- Java 8 parallel streams may make your programs run faster. Or not. Or even slower.
- Thinking about streams as a way to achieve parallel processing at low cost will prevent developers to understand what is really happening. Streams are not directly linked to parallel processing.
- Most of the above problems are based upon a misunderstanding: parallel processing is not the same thing as concurrent processing. And most examples shown about “automatic parallelization” with Java 8 are in fact examples of concurrent processing.
- Thinking about map, filter and other operations as “internal iteration” is a complete nonsense (although this is not a problem with Java 8, but with the way we use it).
So, what are streams
According to Wikipedia:
“a stream is a potentially infinite analog of a list, given by the inductive definition:
data Stream a = Cons a (Stream a)
Generating and computing with streams requires lazy evaluation, either implicitly in a lazily evaluated language or by creating and forcing thunks in an eager language.”
One most important think to notice is that Java is what Wikipedia calls an “eager” language, which means Java is mostly strict (as opposed to lazy) in evaluating things. For example, if you create a List
in Java, all elements are evaluated when the list is created. This may surprise you, since you may create an empty list and add elements after. This is only because either the list is mutable (and you are replacing a null reference with a reference to something) or you are creating a new list from the old one appended with the new element.
Lists are created from something producing its elements. For example:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Here the producer is an array, and all elements of the array are strictly evaluated.
It is also possible to create a list in a recursive way, for example the list starting with 1 and where all elements are equals to 1 plus the previous element and smaller than 6. In Java < 8, this translates into:
List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 6; i++) {
list.add(i);
}
One may argue that the for
loop is one of the rare example of lazy evaluation in Java, but the result is a list in which all elements are evaluated.
What happens if we want to apply a function to all elements of this list? We may do this in a loop. For example, if with want to increase all elements by 2, we may do this:
for(int i = 0; i < list.size(); i++) {
list.set(i, list.get(i) * 2);
}
However, this does not allow using an operation that changes the type of the elements, for example increasing all elements by 10%. The following solution solves this problem:
List<Double> list2 = new ArrayList<Double>();
for(int i = 0; i < list.size(); i++) {
list2.add(list.get(i) * 1.2);
}
This form allows the use of a the Java 5 for each
syntax:
List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
list2.add(i * 1.2);
}
or the Java 8 syntax:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
So far, so good. But what if we want to increase the value by 10% and then divide it by 3? The trivial answer would be to do:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
List<Double> list3 = new ArrayList<>();
list2.forEach(x -> list3.add(x / 3));
This is far from optimal because we are iterating twice on the list. A much better solution is:
List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
list2.add(i * 1.2 / 3);
}
Let aside the auto boxing/unboxing problem for now. In Java 8, this can be written as:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2 / 3));
But wait... This is only possible because we see the internals of the Consumer
bound to the list, so we are able to manually compose the operations. If we had:
List<Double> list2 = new ArrayList<>();
list.forEach(consumer1);
List<Double> list3 = new ArrayList<>();
list2.forEach(consumer2);
How could we know how to compose them? No way. In Java 8, the Consumer
interface has a default method andThen
. We could be tempted to compose the consumers this way:
list.forEach(consumer1.andThen(consumer2));
but this will result in an error, because andThen
is defined as:
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
This means that we can't use andThen
to compose consumers of different types.
In fact, we have it all wrong since the beginning. What we need is to bind the list to a function in order to get a new list, such as:
Function<Integer, Double> function1 = x -> x * 1.2;
Function<Double, Double> function2 = x -> x / 3;
list.bind(function1).bind(function2);
where the bind
method would be defined in a special FList
class like:
public class FList<T> {
final List<T> list;
public FList(List<T> list) {
this.list = list;
}
public <U> FList<U> bind(Function<T, U> f) {
List<U> newList = new ArrayList<U>();
for (T t : list) {
newList.add(f.apply(t));
}
return new FList<U>(newList);
}
}
and we would use it as in the following example:
new Flist<>(list).bind(function1).bind(function2);
The only trouble we have then is that binding twice would require iterating twice on the list. This is because bind
is evaluated strictly. What we would need is a lazy evaluation, so that we could iterate only once.
The problem here is that the bind
method is not a real binding. It is in reality a composition of a real binding and a reduce. "Reducing" is applying an operation to each element of the list, resulting in the combination of this element and the result of the same operation applied to the previous element. As there is no previous element when we start from the first element, we start with an initial value. For example, applying (x) -> r + x, where r is the result of the operation on the previous element, or 0 for the first element, gives the sum of all elements of the list. Applying () -> r + 1 to each element, starting with r = 0 gives the length of the list. (This may not be the more efficient way to get the length of the list, but it is totally functional!)
Here, the operation is add(element)
and the initial value is an empty list. And this occurs only because the function application is strictly evaluated.
What Java 8 streams give us is the same, but lazily evaluated, which means that when binding a function to a stream, no iteration is involved!
Binding a Function<T, U>
to a Stream<T>
gives us a Stream<U>
with no iteration occurring. The resulting Stream
is not evaluated, and this does not depend upon the fact that the initial stream was built with evaluated or non evaluated data.
In functional languages, binding a Function<T, U>
to a Stream<T>
is itself a function. In Java 8, it is a method, which means it's arguments are strictly evaluated, but this has nothing to do with the evaluation of the resulting stream. To understand what is happening, we can imagine that the functions to bind are stored somewhere and they become part of the data producer for the new (non evaluated) resulting stream.
In Java 8, the method binding a function T -> U
to a Stream<T>
, resulting in a Stream<U>
is called map
. The function binding a function T -> Stream<U>
to a Stream<T>
, resulting in a Stream<U>
is called flatMap
.
Where is flatten?
Most functional languages also offer a flatten
function converting a Stream<Stream<U>>
into a Stream<U>
, but this is missing in Java 8 streams. It may not look like a big trouble since it is so easy to define a method for doing this. For example, given the following function:
Function<Integer, Stream<Integer>> f = x -> Stream.iterate(1, y -> y + 1).limit(x);
Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).flatMap(f);
System.out.println(stream2.collect(toList()))
to produce:
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]
Using map
instead of flatMap
:
Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).map(f);
System.out.println(stream2.collect(toList()))
will produce a stream of streams:
[java.util.stream.SliceOps$1@12133b1, java.util.stream.SliceOps$1@ea2f77,
java.util.stream.SliceOps$1@1c7353a, java.util.stream.SliceOps$1@1a9515,
java.util.stream.SliceOps$1@f49f1c]
Converting this stream of streams of integers to a stream of integers is very straightforward using the functional paradigm: one just need to flatMap the identity function to it:
System.out.println(stream2.flatMap(x -> x).collect(toList()));
It is however strange that a flatten
method has not been added to the stream, knowing the strong relation that ties map
, flatMap
, unit
and flatten
, where unit
is the function from T
to Stream<T>
, represented by the method:
Stream<T> Stream.of(T... t)
When are stream evaluated?
Streams are evaluated when we apply to them some specific operations called terminal operation. This may be done only once. Once a terminal operation is applied to a stream, is is no longer usable. Terminal operations are:
forEach
forEachOrdered
toArray
reduce
collect
min
max
count
anyMatch
allMatch
noneMatch
findFirst
findAny
iterator
spliterator
Some of these methods are short circuiting. For example, findFirst
will return as soon as the first element will be found.
Non terminal operations are called intermediate and can be stateful (if evaluation of an element depends upon the evaluation of the previous) or stateless. Intermediate operations are:
filter
map
mapTo...
(Int
,Long
orDouble
)flatMap
flatMapTo...
(Int
,Long
orDouble
)distinct
sorted
peek
limit
skip
sequential
parallel
unordered
onClose
Several intermediate operations may be applied to a stream, but only one terminal operation may be use.
So what about parallel processing?
One most advertised functionality of streams is that they allow automatic parallelization of processing. And one can find the amazing demonstrations on the web, mainly based of the same example of a program contacting a server to get the values corresponding to a list of stocks and finding the highest one not exceeding a given limit value. Such an example may show an increase of speed of 400 % and more.
But this example as little to do with parallel processing. It is an example of concurrent processing, which means that the increase of speed will be observed also on a single processor computer. This is because the main part of each “parallel” task is waiting. Parallel processing is about running at the same time tasks that do no wait, such as intensive calculations.
Automatic parallelization will generally not give the expected result for at least two reasons:
- The increase of speed is highly dependent upon the kind of task and the parallelization strategy. And over all things, the best strategy is dependent upon the type of task.
- The increase of speed in highly dependent upon the environment. In some environments, it is easy to obtain a decrease of speed by parallelizing.
Whatever the kind of tasks to parallelize, the strategy applied by parallel streams will be the same, unless you devise this strategy yourself, which will remove much of the interest of parallel streams. Parallelization requires:
- A pool of threads to execute the subtasks,
- Dividing the initial task into subtasks,
- Distributing subtasks to threads,
- Collating the results.
Without entering the details, all this implies some overhead. It will show amazing results when:
- Some tasks imply blocking for a long time, such as accessing a remote service, or
- There are not many threads running at the same time, and in particular no other parallel stream.
If all subtasks imply intense calculation, the potential gain is limited by the number of available processors. Java 8 will by default use as many threads as they are processors on the computer, so, for intensive tasks, the result is highly dependent upon what other threads may be doing at the same time. Of course, if each subtask is essentially waiting, the gain may appear to be huge.
The worst case is if the application runs in a server or a container alongside other applications, and subtasks do not imply waiting. In such a case, (for example running in a J2EE server), parallel streams will often be slower that serial ones. Imagine a server serving hundreds of requests each second. There are great chances that several streams might be evaluated at the same time, so the work is already parallelized. A new layer of parallelization at the business level will most probably make things slower.
Worst: there are great chances that the business applications will see a speed increase in the development environment and a decrease in production. And that is the worst possible situation.
Edit: for a better understanding of why parallel streams in Java 8 (and the Fork/Join pool in Java 7) are broken, refer to these excellent articles by Edward Harned:
What streams are good for
Stream are a useful tool because they allow lazy evaluation. This is very important in several aspect:
- They allow functional programming style using bindings.
- They allow for better performance by removing iteration. Iteration occurs with evaluation. With streams, we can bind dozens of functions without iterating.
- They allow easy parallelization for task including long waits.
- Streams may be infinite (since they are lazy). Functions may be bound to infinite streams without problem. Upon evaluation, there must be some way to make them finite. This is often done through a short circuiting operation.
What streams are not good for
Streams should be used with high caution when processing intensive computation tasks. In particular, by default, all streams will use the same ForkJoinPool
, configured to use as many threads as there are cores in the computer on which the program is running.
If evaluation of one parallel stream results in a very long running task, this may be split into as many long running sub-tasks that will be distributed to each thread in the pool. From there, no other parallel stream can be processed because all threads will be occupied. So, for computation intensive stream evaluation, one should always use a specific ForkJoinPool
in order not to block other streams.
To do this, one may create a Callable
from the stream and submit it to the pool:
List<SomeClass> list = // A list of objects
Stream<SomeClass> stream = list.parallelStream().map(this::veryLongProcessing);
Callable<List<Integer>> task = () -> stream.collect(toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<SomeClass> newList = forkJoinPool.submit(task).get()
This way, other parallel streams (using their own ForkJoinPool
) will not be blocked by this one. In other words, we would need a pool of ForkJoinPool
in order to avoid this problem.
If a program is to be run inside a container, one must be very careful when using parallel streams. Never use the default pool in such a situation unless you know for sure that the container can handle it. In a Java EE container, do not use parallel streams.
Previous articles
Opinions expressed by DZone contributors are their own.
Comments