Optimization Techniques Using Java Streams
In this post, we'll focus on how to handle the post-processing of the data using Java Streams. We'll also see very powerful techniques to optimize and order the results.
Join the DZone community and get the full member experience.
Join For FreeThe incredible amounts of data collected by digital technologies have created an explosion of big data analysis. It allows companies, governments, and other organizations to find patterns and predict future behavior. For example, it can aid in sale projections, marketing campaigns, solving and preventing crime, etc. The possibilities are disparate and limitless.
Similarly, AI applications and neural networks in particular use big datasets. When working on a complex problem modeled by a neural network it is very important to collect big amounts of data that is properly formatted and labeled. We divide this data into training and testing datasets that we use to get the values of the weight matrices of the model.
The emphasis of the previous points is to bring home the need to handle big amounts of data that we format, label, and optimize with these applications. Generally, this data manipulation is required before and after the data has been processed. Due to the high amounts of data involved, it’s difficult to format, process, and efficiently store this data.
In this article, we are going to focus on how to handle the post-processing of the data using Java Streams. We are going to see very powerful techniques to optimize and order the results. The focus will be on providing very efficient mechanisms to process the data, easily parallelize the computations and allow to quickly swap implementations using functional interfaces.
Optimization Techniques
Let’s consider a simple example where we have a list of events that we want to process. We are not concerned about how we obtained this list, either a batch process, analytic tools, or any other mechanism that selects for us a list of candidate events that we want to optimize, order and categorize.
The events that we are going to consider are very simple and have a very generic structure. This is for demonstration purposes only. We could also have considered instead having other entities like customers, housing, reports, etc. The analysis that will follow will apply to a wide range of cases.
We define an Event with the following implementation:
public static record Event(String location, int price, String eventType, int eventRankId) {}
Notice that we use a Java record to define the Event. The record is a concise way to define a class. This entity has four fields: location, price, eventType, and eventRankId. The eventRankId is a primary key that uniquely defines an event.
To start let’s solve a simple problem: suppose that we want to select a list of 10 events that are closest to a given location. The main requirement is that we want this operation to be parallelizable and performant. At the same time, we want that the algorithm that implements this selection is pluggable so we can swap it seamlessly when we want to change the implementation.
As an example, let’s write the following code to get the distance between two positions:
public static int evaluateLocationProximity(String location1, String location2) {
return Math.abs(location1.hashCode() - location2.hashCode());
}
This function takes two locations as parameters and returns a number that represents the distance between them. The method signature and its implementation are not realistic and are here just for demonstration purposes only. In a production environment, the function could find the best route between two locations and assign a value based on the travel time, physical proximity, or convenience. The smaller the return value, the closer these two locations are, in the generic sense that we have defined them.
We need to apply this algorithm to our given list of events. We want to order the events based on the return value from this function.
Let’s review the following implementation using Java streams:
public static List<Event> getNearEvents(int number, List<Event> events, String targetLocation) {
Map<Integer, Set<Event>> distanceEventsMap = events.stream().
collect(Collectors.groupingBy(
event -> evaluateLocationProximity(event.location(), targetLocation),
TreeMap::new,
Collectors.toCollection(() -> new TreeSet<>(
Comparator.comparingInt(Event::eventRankId)))));
return distanceEventsMap.values()
.stream()
.flatMap(Set::stream)
.limit(number)
.collect(Collectors.toList());
}
The function parameters take the number of events that we want to return, the input list of events, and the target location that the events must be closest to.
In the implementation, we use the static function Collectors::groupingBy
(line 4). Its first parameter groups the elements using evaluateLocationProximity()
as the classifier function. In the second parameter, the map factory generates a TreeMap
to preserve the order, the keys of the map are the distances between the events to the target location and the values are the corresponding events. Finally, the last parameter is a downstream Collector, we use Collectors::toCollection
to get a Collector that accumulates the events that have the same key value and are ordered by Event::eventRankId
.
Finally, from the Map<Integer, Set<Event>>
we get the values, flatten the set of events, and set a limit to the returned list.
The solution to the problem that we posted before, for example finding the best 10 events to a given location such as “Grand Central NYC” will be:
List<Event> events = //list of initial input list
List<Event> bestEvents = getNearEvents(10, events, "Grand Central NYC");
We get a sub-list with the 10 closest events ordered by distance and in the case of two events with the same distance to the target location then they are ordered by eventRankId
.
Using Functional Interfaces
Sometimes the algorithms that we are using have to be modified or tweaked to react to changes in the market conditions. Functional interfaces are a simple technique that allows us to easily change the implementation. For example, consider the previous example modified with the following implementation:
public static List<Event> getNearEvents(int number, List<Event> events, String targetLocation,
BiFunction<String, String, Integer> locationProximityEvaluator) {
Map<Integer, Set<Event>> distanceEventsMap = events.stream().
collect(Collectors.groupingBy(
event -> evaluateLocationProximity(locationProximityEvaluator.apply(event.location(), targetLocation),
TreeMap::new,
Collectors.toCollection(() -> new TreeSet<>(
Comparator.comparingInt(Event::eventRankId)))));
return distanceEventsMap.values()
.stream()
.flatMap(Set::stream)
.limit(number)
.collect(Collectors.toList());
}
We use the Java BiFunction to pass the implementation of the algorithm.
Invoking this function now is very simple:
getNearEvents(10, events, "New York", (l1, l2) -> Math.abs(l1.hashCode() - l2.hashCode()));
This technique is not only useful to decouple the ordering from the optimization algorithm but also allows to change easily the implementation. This can be useful during development but also at run time.
Concurrent Implementation
Using Java streams allows us to easily improve the performance of the function using parallel processing. The most straightforward way to do it is just to invoke “parallel” in the previous function, line 3 of getNearEvents()
:
Map<Integer, Set<Event>> distanceEventsMap = events.stream().parallel()
This simple modification causes the framework to subdivide the collection into parts and run them in separated threads. When all the parts are processed the framework combines all these elements serially to the previous TreeMap
to give you the result. This is a very impressive result, just adding a simple “parallel” method to the stream causes the framework to utilize the multicore resources of your running environment to improve the performance of your application.
However, we can do better and write an implementation that optimizes the code in getNearEvents()
, using Collectors::groupingByConcurrent
in line 4 instead of Collectors:groupingBy
:
Collectors.groupingByConcurrent(
event -> evaluateLocationProximity(event.location(), targetLocation),
ConcurrentSkipListMap::new,
Collectors.toCollection(() -> new TreeSet<>(
Comparator.comparingInt(Event::eventRankId)))));
The difference with the previous implementation is that for the second parameter we substitute TreeMap::new
with ConcurrentSkipListMap::new
. ConcurrentSkipListMap
provides a scalable concurrent implementation of a Map that in our case is sorted by the natural ordering of the key (the distance between two events). The advantage is that groupingByConcurrent
accumulates the elements from the different threads directly into this Map instead of having to be combined serially.
Therefore, the following function shows how to accomplish this implementation and at the same time preserve the ordering of the events:
List<Event> getNearEventsConcurrent(int number, List<Event> events, String targetLocation) {
Map<Integer, Set<Event>> distanceEventsMap = events.stream().parallel().
collect(Collectors.groupingByConcurrent(
event -> evaluateLocationProximity(event.location(), targetLocation),
ConcurrentSkipListMap::new,
Collectors.toCollection(() -> new TreeSet<>(
Comparator.comparingInt(Event::eventRankId)))));
return distanceEventsMap.values()
.stream()
.flatMap(Set::stream)
.limit(number)
.collect(Collectors.toList());
}
To summarize we have reviewed the three cases:
- serial processing (
getNearEvents()
) - using the same method but adding the parallel stream invocation
getNearEventsConcurrent()
that does the parallel computation usinggroupingByConcurrent
.
Running a quick test with a list of 10 million events, we get the following results for each case:
Starting process serial, chosen events:
[Event[location=mtrddza, price=10, eventType=drihps, eventRankId=5255511] ...
process duration: 16022 milliseconds
Starting process parallel, chosen events:
[Event[location=mtrddza, price=10, eventType=drihps, eventRankId=5255511]...
process duration parallel: 12014 milliseconds
Starting process concurrent, chosen events:
[Event[location=mtrddza, price=10, eventType=drihps, eventRankId=5255511]...
process duration concurrent: 10615 milliseconds
We see that the parallel implementations are more performant than the serial process. But the best result is when we run with groupingByConcurrent()
being more than 50% faster.
So far, we have reviewed ways to implement several optimization techniques to get the closest events to a given target location. If two events had the same value, we ordered them based on their event rank id. As we are going to see in the next section, using Java Streams, we can do more sophisticated selections where we can find events for, possibly, an unlimited number of conditions.
Optimization Using Multiple Selections
Java streams allow us to do multilevel reductions. We are going to use this powerful technique to order our list of events applying more than one condition. We have seen on the Collectors::goupingBy
implementation that you can specify a downstream collector in its third parameter. This allows us to do subgrouping on the original selection.
Let’s consider an example where we want to classify the event by location proximity but in the case of events that have the same value, we want to order them by some other measure that we are going to call affinity. For this simple example we are going to define affinity with the following function:
int evaluateProvidersAffinity(String localEventType, int localPrice, String targetEvtType, int targetPrice) {
if (localEventType.equalsIgnoreCase(targetEvtType))
return Math.abs(localPrice - targetPrice);
else
return (100 + Math.abs(localPrice - targetPrice));
}
This implementation compares the prices and event types to get a value for the closest match to the target values;
With this in hand, we can now expand our previous implementation of getNearEvents()
to show how to do the optimization and ordering for event location and affinity together:
List<Event> getEventsByProximityAndAffinity(int number, List<Event> events,
String targetLocation, String targetType, int targetPrice) {
Map<Integer, Map<Integer, Set<Event>>> chosenEvents = events.stream()
.collect(groupingBy(event -> evaluateLocationProximity(event.location(),
targetLocation),
TreeMap::new,
groupingBy(event -> evaluateProvidersAffinity(event.eventType(),
event.price(), targetType, targetPrice),
TreeMap::new,
Collectors.toCollection(() ->
new TreeSet<>(Comparator.comparingInt(
Event::eventRankId))))));
return chosenEvents.values()
.stream()
.flatMap(map -> map.values(
.stream()
.flatMap(Set::stream))
.limit(number)
.collect(Collectors.toList());
}
As we did before, the first Collectors::groupingBy
takes the evaluateLocationProximity()
as the classifying function, we generate a TreeMap
to preserve order. For the downstream Collector, we use a Collectors::groupingBy
using evaluateProvidersAffinity()
as the classifying function with the same ordering as before.
The result is a Map<Integer, Map<Integer, Set<Event>>>
where the key of the first map is the `distance’ assigned to each event and its values is another map with the key being the value of the affinity for that event and its values are the lists of events. Therefore these events correspond to a given `distance’ and `affinity’.
The final part is to return the list of events by doing a couple of flatMap(s) to extract them ordered by `distance’, `affinity’, and `rankId’.
We could continue this technique to order the events with more attributes.
Conclusions
We have reviewed powerful techniques, using Java streams, to select ordered elements from a group optimized with multiple algorithms.
With simple modifications, we can parallelize the calculation to utilize the computing resources efficiently to maximize the speed of the implementation.
In a future publication, we will delve into how to scale this application horizontally to tackle huge volumes of data.
Published at DZone with permission of Manu Barriola. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments