Anatomy of Sequential Data Processing With Java Streams
It takes an array of integers as a parameter, and very simply sums all the values and returns the sum. We can use recursion for this.
Join the DZone community and get the full member experience.
Join For FreeFunctional Programming History
The history of functional programming can be traced back to Lambda-calculus. It is a mathematical language invented by Alonzo in 1930. In some ways, lambda-calculus is the first programming language. Some principles of lambda calculus can be found in modern functional languages and even in Java also. The first principle is that the main object in this kind of language is functions and functions in the mathematical sense of these words. something that takes input and returns the output.
It always returns the same output if based on the same input. Hence, this property goes by the name of being stateless. In other words, the function has no memory of previous inputs.
This property is so important that it’s been given different names such as purity or pure functions. It’s also been called referential transparency. The consequence of this property is that there are no variables because variables are used to preserve the memory of past events. Therefore, there’s also no assignments and no traditional loops because loops are based on some variable repeatedly changing its value.
How Can We Program With No Loops?
Let’s see a simple example. Consider two java web development fragments here, these two methods as below:
On the left-hand side, there is a method called arraySum(). It takes an array of integers as a parameter, and very simply sums all the values and returns the sum. Now, how can we do the same thing in java recursion? We can use recursion for this.
In recursion, the method is going to call itself with slightly different parameters. In the Right-hand site method, I have used an extra parameter called ‘start’. This method will return the sum of the elements of the array whose index is greater than or equal to start. In the else block, it will return the sum of the first element plus the result of calling the same method with an increased value of start. This obtains the same effect as the more natural method on the left side. It does so without any variables.
Just because we can replace the iteration with the recursion, it does not mean that we should do it. In particular, we should not do it in Java as Java is not optimized for recursion. It means that the example on the left-hand side is going to only require constant space to perform the sum, and this constant space is taken by two variables “sum” and “i”.
On the other hand, the method on the right-hand side requires linear extra space. In other words, an amount of space or memory which is proportional to the size of the input array and this amount of memory is used by recursion because a very recursive call to the method will take some space in stack. It means that if the input array is too large, the iterated versions on the left will work fine, while the recursive version on the right will run in the stack overflow issue.
Streams in Java
Streams are probably the most exciting new feature of Java 8. Now, first, let’s compare Streams with collections and Iterators. Streams represent a new powerful framework for sequential data processing. We’ll compare streams with collections and with iterators.
The stream is a sequence of objects supporting a special type of iteration called internal iteration. Before I write about internal iteration, let’s recall the main features of other standard representations for sequences of objects. First, there is List, then there are also iterators and finally the new concepts of Streams. What we can do in Lists where we can add Objects, remove objects, and search for an object and we can also scan and iterate. With the use of the Iterator, we can iterate through a list. An iterator has very limited functionality which is iterate that means giving the next object in sequence and possibly remove the last object which was returned.
Now, Stream on the other hand from an abstract point of view, they only have one special operation which is internal operation. In a few words, internal iteration means to apply some operation on every object of the sequence. This may seem like a single operation but, since you can apply a wide variety of different operations on the elements, you get a wide set of practical operations.
Internal Iteration Overview in Streams
Now, to understand internal iteration, let’s start with a very simple example: Suppose we want to print every element in a sequence of objects.
List<Integer> l = …;
for(Integer n : l)
SOP(n);
In this example, our sequence of objects is represented by a List just a simple list of Integers. How do we print every integer in the list? We can use enhanced for loop and print every integer N in the list.
Suppose, our sequence is encoded as an iterator. We can use a while loop to get every integer in the sequence and print it as below:
xxxxxxxxxx
Iterator i = …;
while(i.hasNext())
System.out.println(i.next());
Finally, let’s move to Streams. It’s an interface. We can print every object in a sequence as below. It is quite different from the previous two cases and in this, there are no loops:
xxxxxxxxxx
Stream<Integer> s = …;
s.forEach(System.out::println);
Instead, here we are calling the for each method of the stream object and we pass in this case the new syntax of a method reference. we are passing the reference to the print line method of the System.out object. There is no explicit loop here and there is no variable referring to the generic element in the sequence. It is the key idea of internal iteration which is we pass some functional object to the for each method and the for each method takes care of iteration.
It is internal because the responsibility for the iteration lies within the collection or in this case, lies within the stream. This is based on this interface called stream in the java.util package.
Stream Operations Anatomy
A stream is a parametric interface with more than 30 instance methods and a couple of static methods. to summarize what internal iteration is. It consists of repeatedly applying me operations to all elements in a sequence and to do so in a way that shifts responsibility from the client or user to the stream library.
Now, what kind of operation can be applied to the elements of a stream? There are three kinds of operations applicable here.
Build Ops
Intermediate Ops
Terminal Ops
First, there is a special kind of operation which is build operation. It is an operation that creates a stream from some data source. Next, there are intermediate operations that convert one stream into another and possibly into a stream of another type. Finally, there’s terminal operation they convert the stream into something else meaning there are terminal operations that return void because they have some other side effect. rather than returning a value, they changed the state of something else. Now, these operations are supposed to be changed in the below-mentioned order.
In the above diagram, the sequences of operations are called Pipeline. This pipeline must start with a build operation and then it must continue with zero or more intermediate operations and finally it must end with exactly one terminal operation. And this terminal operation may or may not return a result.
Problem Statement: Let’s take an example of stream processing. Let’s say we are using an employee class characterized by a String name and an Integer Salary. Suppose, we want to print the names of the employees whose salary is at least 20000rs and we want these names to appear alphabetically sorted.
Solution: Now, how we can achieve that using streams. Suppose we have an array of employees, what we will do here is we’ll convert the array into a stream with this static method of the arrays class. Next is filtered to select the employees with a high salary and then will use a map to extract the names of the selected employees. And then will use the sorted() method to sort the employee names. And finally, the forEach() method is going to print out the strings.
xxxxxxxxxx
Arrays.stream(emps).filter(e -> e.getSalary()>20000)
.map(Employee::getName)
.sorted()
.forEach(System.out::println);
Here the stream() method is the build operation and Filer, map, and sorted() are intermediate operations and forEach() is a terminal operation.
Types of Options in Stream
Finally, you should be aware that streams come with a couple of options. The main options are the following:
- Ordered or Unordered
- Sequential or Parallel
The object in a stream may come in a fixed order or not. If they do this order is called the encountered order because it is the order in which the objects will be encountered long the stream. And if you are dealing with an ordered stream the operations that you perform on it are going to be executed according to the encountered order.
On the other hand, if you are dealing with Unordered Stream then operations may be performed in any order and there is no specific guarantee on the order in which they are going to be executed. Next, if you have a sequential stream, the operations are going to be performed on one object at a time. Conversely, with parallel stream operations may be performed on several objects in parallel. It is one of the strongest points of streams i.e. seamless parallelization.
Creating a Stream
Now, we are going to see three different ways to create a stream:
The static sequence of objects
A collection
A Computation
Static Sequence of Objects
In this case, we can use the following static method from the Stream Interface. The method is called off and it is the so-called variadic method meaning that it accepts an arbitrary number of arguments.
xxxxxxxxxx
public static <T> Stream<T> of (T… values)
Example:
xxxxxxxxxx
Stream<Integer> fib = Stream.of(1,1,2,3);
Here, in this example, we have created a stream of 4 integers which, are the first 4 Fibonacci series numbers. Now in the below example, recall the variadic methods that can accept an array rather than a list of comma-separated arguments. this of the () method can be used to tun an array into a stream.
xxxxxxxxxx
Employee> emps = …
Stream<Employee> empStream = Stream .of(emps);
In the above example, we have passed the employee array to the of() method and get a stream of employees. this method only works for very simple cases. We already know one way to turn an array into a stream, there is a more specific method in the arrays class.
xxxxxxxxxx
public static <T> Stream<T> stream(T[] array)
public static <T> Stream<T> stream(T[] array, int beginExclusive, int endExclusive)
Here, it allows us to convert only a slice of an array so from some starting index to some ending index. As we can expect that streams created from arrays are ordered and they are sequential by default. And we’ll see that there are ways to change the type of stream after it is been built. a stream can be converted from ordered to unordered and from sequential to parallel and vice versa.
Streams From Collection
We might have some data in the standard collection which, maybe a Set, or List or queue. In this case, we can use the below two methods from the Collection interface.
Stream<T> stream()
Stream<T> parallelStream()
Let’s say we have a collection of employees and we can call the stream() method and will get a stream of the same type.
xxxxxxxxxx
Collection<Employee> emps = …
Stream<Employee> empStream = emps.stream();
Now, what is the type of this stream? If you start from the ordered collection such as List then resulting stream will be ordered and if you start from the unordered collection such as Set and then you will get an unordered stream. But if we have a sorted set then we’ll get an ordered stream.
Streams From Computation
We can get streams by dynamically generating every single object on the fly. There are two specific ways to do it. The first one is we can generate each element in the sequence separately and in this case, we are going to use a Supplier which is a functional interface. Or we can generate each element in the sequence from the previous one via Unary Operator which is another functional interface.
Let’s consider that streams generated in this way are going to be potentially infinite because in principle we could keep generating in more and more elements without any bound. On the other hand, we only have to create only a finite number of them. We have to insert some operations that limit the length of the stream. Otherwise, we are going to end up with an infinite loop.
Now, how can we generate each element separately? We can use the generate() static method from the stream and we pass Supplier to it. A supplier is an object with a single function that takes no arguments and returns an object of type T. Here, we’ll get an infinite unordered stream.
Example:
xxxxxxxxxx
Random ran = new Random();
Stream<Integer> randoms = Stream.generate(random::nextInt);
Here, I have defined a stream of random integers by instantiating a Random class and then calling the Stream.generate() method and as a supplier of integers, I’ve passed a method reference to the nextInt() method of the random object. This method fits the signature for suppliers because it’s a method of taking no arguments and return an integer. In this way, I built a potentially infinite stream of random integers.
Now, we will generate a stream using UnaryOperator. In this case, will use the iterate() static method. It takes two arguments. The first argument is the first object in the sequence and the second argument is a function which given the previous element in the sequence, builds, and returns the next element in the sequence. And using this technique, we get an ordered infinite stream. For example, let’s build a stream of strings, and then the stream is going to contain a sequence of longer sequences of the letter “a”.
xxxxxxxxxx
Stream<String> sm = Stream.iterate(“a”, s -> s + “a”);
Here in this example, the first element I passed the string “a”. And as UnaryOperator, I pass a function that takes a string and appends an extra “a” to it. So that we’ll get a stream of longer sequences of “a”.
Understanding Lazy Evaluation in Streams
In programming languages, by lazy we mean as late as possible or on-demand and it’s opposite is eager. Which means as soon as possible.
Suppose, we have two sets of employees S1 and S2 as below and then you can add all elements of s1 to s2 using the addAll() method.
xxxxxxxxxx
Set<Employee> s1 = …
Set<Employee> s2 = …
S2.addAll(s1);
Here, the addAll() method adds all elements of s1 to s2. And this is called eager evaluation because it executes its effects immediately which means as soon as possible. On the other hand, compare this with my previous example of the string of random integers.
xxxxxxxxxx
Random ran = new Random();
Stream<Integer> randoms = Stream.generate(random::nextInt);
Here the invocation to the generate() method defines a stream and a possibly infinite stream of random integers. Naturally, when we call the generate() method, no integer is generated yet. That’s the only option because it certainly cannot generate an infinite stream.
the only option is to not generate any integer yet until those integers are going to be needed. This is called Lazy Evaluation. The integers will be generated only when they are needed. when it is going to happen and when are they going to be needed? The rule is Elements are created and processed when required by the terminal operation i.e. elements are pulled from the end and not pushed from the start. In other words, stream pipelines must always stand with the terminal operation and it is that operation that controls the flow of objects along the stream. Elements are not pushed from the start instead of pulled from the terminal operation.
Let’s check this with a practical example: We will define a supplier of Integers which takes random numbers, prints it out, and returns it. And will use this supplier to define a potentially infinite stream of integers. Next, will process the stream using some operations
xxxxxxxxxx
final Random ran = new Random();
Supplier<Integer> sup = () -> { Integer res = ran.nextInt(); return res;};
Stream<Integer> random = Stream.generate(sup);
random.filter(n -> n>=0).limit(3).forEach(System.out::println);
Here first, we apply a filter that will only select those elements that satisfy this predicate and the predicate says that the number should be better than or equal to zero. Next, we applied the limit operation which just picks the first their elements from the stream, and finally, the terminal operation is the forEach() for printing out the selected elements.
Now, let’s consider a very important rule of Streams which says that they can only be traversed once. Here, I’ll show you what happens if we try to traverse a stream twice. consider the previous stream that we previously saw, a stream of 4 integers first 4 Fibonacci numbers.
xxxxxxxxxx
Stream<Integer> fib = Stream.of(1,1,2,3);
fib.forEach(System.out::println); //output: 1 1 2 3
fib.forEach(System.out::println); // output: java.lang.illegalStateException
As you can see in the above example, in the first forEach() method, we print the numbers which are perfectly fine. But now in the next line again we tried to print them using forEach() and now we got an exception (Stream has already been operated or closed) there. This is because the stream was already used in another pipeline, and therefore it’s useless now.
If we try to split a stream pipeline in the wrong way, then also we will get the same error.
xxxxxxxxxx
Stream<Integer> fib = Stream.of(1,1,2,3);
fib.limit(2);
fib.forEach(System.out::println); // output: java.lang.illegalStateException
Here, we are doing the limit operation which limits the length of the stream to the size 2. And after that, we are trying to print the same stream which will not work. We will get the same error as the previous example. the point is that the intermediate operations such as limit, they can be chained together because they return a new stream. And on that new stream, we can apply a further intermediate operation or terminal operation. In this case, we are discarding the return value of the limit operation which is a new stream and we are applying the forEach() terminal operation to the old stream which is incorrect.
The right way to achieve that if we need for some reason to split the pipeline into multiple instructions, the right way is to store the return value of the limit operation into another reference of type stream of an integer. And then we can continue processing that stream means a new stream.
xxxxxxxxxx
Stream<Integer> fib = Stream.of(1,1,2,3);
Stream<Integer> fibTwo = fib.limit(2);
fibTwo .forEach(System.out::println);
The above program will give us the correct output without any error as: 1 1.
Conclusion
In this article, the basics of data processing with streams have been mentioned in detail. In addition to that, I have explained the comparison between streams and collections on the conceptual node. Streams also can be considered monads due to the presence of the flatMap operation and other stream operations. The advance streams concepts will be available in our upcoming blogs.
Reference
https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
Opinions expressed by DZone contributors are their own.
Comments