Reactive Programming With Spring WebFlux
In this post, we take a look at how to implement the reactive programming model and see its benefits in action by making use of Spring WebFlux.
Join the DZone community and get the full member experience.
Join For FreeRecently, we have been hearing about a new paradigm of coding that is gaining more and more attention, so-called Reactive Programming. Reactive Programming is a model of coding where communication mainly happens through a non-blocking stream of data. This model of programming makes your code "reactive," reacting to change and not being blocked, such as performing operations that read and waiting for responses from a database or file. Hence, in this manner, we enter in a mode where we are reacting to events as data becomes available. Its principles are based on the Reactive Manifesto.
Backpressure
One important concept is the concept of the backpressure. This is a mechanism that must ensure producers don't overwhelm consumers. So, for instance, if a consumer can't handle more than 100 HTTP requests per second, if this eventually occurs, the consumer must slow the flow, or even stop for a moment, until the situation gets normalized again.
Hence, in order to deal with an enormous flow of data, this is an important aspect to take into consideration.
Specification, Implementations, Libraries, and Spring WebFlux
Reactive Programming, Reactive Streams, and Reactor: What do they have to do with each other? We often see these terms used interchangeably, but they are not exactly the same thing. So, just to get a clearer view of them, let's summarize:
- Reactive Programming: This is the paradigm, the model, that dictates the reason for the existence of the other terms below.
- Reactive Streams: This is a specification that defines how an API that implements and follows the Reactive Programming paradigm should work. By the way, JDK 9 comes already prepared with it.
- Reactor: This is a Java implementation of the Reactive Streams specification.
Spring WebFlux is the "reaction" of Spring for this paradigm to use on web applications. It is a web framework that brings support for the reactive programming model. Spring WebFlux is implemented using the Project Reactor, the library chosen by Spring.
WebFlux is not a replacement for Spring MVC — they can actually complement each other, working together on the same solution. We are going to use it in our sample to better understand how everything works.
The Problem and the Solution, Hands On!
Let's travel a little bit to the not-so-distant future. Imagine a highway with heavy traffic — with a flow of vehicles that could reach something like 5000 vehicles per hour at the highest peak of a day, and even more on holidays. In this future, all vehicles are obligated to have an RFID device installed that transmits some data calculated by the vehicle's computer. In some places in this highway, we have receptors installed, receiving all information of the vehicles, like plate number, weight, speed, color, etc.
That information is flowing as a data stream all the time to a host of the company, which makes all this available throughout its server, with Spring WebFlux Endpoints. Everyone interested could connect a WebClient to receive this information in real time, as the vehicles go by.
The Spring WebFlux Server
There are two programming modes that use WebFlux — annotated controllers (like in Spring MVC) or functional endpoints. We are using the latter one, taking advantage of Java 8;s lambda expressions, implementing them through the functional programming model.
First, let's define in this server a router to receive the requests of the clients interested in highway vehicle traffic.
@Component
public class HighwayRouter {
@Bean
public RouterFunction route(HighwayHandler highwayHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/vehicles")
.and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)),
highwayHandler::vehicleDetected);
}
}
Through RouterFunctions, we set up a GET Endpoint named "/vehicles", answering in the format of a stream of JSON objects. The stream is coming from the method vehicleDetected on the handler class HighwayHandler.
@Component
public class HighwayHandler {
@Autowired
HighwayTraffic highwayTraffic;
public Mono vehicleDetected(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(highwayTraffic.flowTraffic(),Vehicle.class);
}
}
The class responsible for handling the requests has to get the information from some source, as we are dealing with a flow. Usually, the source must be capable of generating a data stream. Spring is also adding the feature of Reactive Repositories to the Spring Data framework. This way it will be possible to have the source, like a Mongo database, generating in a non-blocking reactive model as well. In our sample, we are simulating this traffic flow in memory.
At the core of this in-memory simulator, we have a method that generates the traffic. Here is where we create a Flux object. There are basically two types of data objects that Reactor provides, Mono and Flux.
As we have a response with 0...N registers (vehicles), we use a Flux object. Let's simulate a hot stream, that is, a flow of data that happens uninterrupted. That's different from a cold stream, which is expected to have a finite collection of data.
public Flux flowTraffic() {
LocalDateTime startTime = LocalDateTime.now();
return Flux.create(fluxSink -> {
boolean inFrameTime = true;
int index = 1;
while (index <= 30000 && inFrameTime && !fluxSink.isCancelled() ) {
fluxSink.next(HighwayUtilities.simulateTraffic());
index++;
long timeMinutesHighwayOpened = startTime.until(LocalDateTime.now(),
ChronoUnit.MILLIS);
if (timeMinutesHighwayOpened > 30000) {
LOGGER.info("TrafficSimulator finish --> With timer");
inFrameTime = false;
}
}
}).share();
}
Inside the creation of the Flux object (from line 33 to 41), we have a loop statement where we read the vehicles. Our in-memory utility method HighwayUtilities.simulateTraffic() simulates a car passing by in the highway sending RFID signal data. In order for this not to run infinitely, we configure two ways to stop it: either after 30,000 vehicles have passed when 30 seconds have run, whatever comes first.
One important thing to pay attention to here is that the consumer will be receiving information every time the fluxSink.next(Vehicle) method is called while the loop statement is running — not after only it has finished. In the sequence of calls in those two methods of the Flux object, after the method create(...), we call share(), where it returned a new Flux that will multicast the original. I said "will multicast" because another characteristic of share() is that the Flux will emit data only when at least one subscriber is connected — when there's no client subscribed, the Flux it will be canceled.
The Spring WebFlux Client
Now it is time, as a consumer, to "plug in" our WebClient with the server data stream endpoint of the highway concessionary. We are interested in listening to all the vehicles flowing through it. In order to achieve this, let's use the Spring WebFlux WebClient, also part of its framework. It is a non-blocking reactive client to execute HTTP requests against reactive streams. It also provides some features like controlling backpressure. Let's create an instance connected to the server:
private WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build();
And then we start the connection, subscribing to its endpoint when this method below is called.
public Disposable vehicleDetected() {
AtomicInteger counter = new AtomicInteger(0);
return webClient.get()
.uri("/vehicles")
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.publishOn(Schedulers.single())
.flatMapMany(response -> response.bodyToFlux(Vehicle.class))
.delayElements(Duration.ofMillis(1000))
.subscribe(s -> {
System.out.println(counter.incrementAndGet() + " >>>>>>>>>> " + s);
},
err -> System.out.println("Error on Vehicle Stream: " + err),
() -> System.out.println("Vehicle stream stoped!"));
}
Some points to check here:
- At line 7, we are asking that the functions operating via the Flux(onNext, onComplete and onError) be performed on this supplied thread.
- At line 9, we are dealing with the backpressure. As the server might stream lots of vehicles each second, we as a client can't afford to process all this volume in such a short period, so we use the delayElements, setting up for 1 vehicle per second max.
- At line 10, at the end of the sequence of methods, we subscribe to the endpoint, receiving and working with the data. At this moment, the server "opens the gate" and lets the data flow.
Notice that this is a non-blocking operation. The main thread does not get stuck after the call to those WebClient functions. However, the lambda expression inside the subscribe method keeps receiving information as long as the server is still returning data.
There are more options available for the WebClient. For instance, we can do some filtering with the stream, like receiving only the vehicles with a speed higher than 120 Km/h (check the line 8 below). Also, in this case, as the flow of vehicles will usually be smaller, we "relieve" the backpressure a little, delaying only 250 milliseconds.
public Disposable vehicleHigherThen120Detected() {
AtomicInteger counter = new AtomicInteger(0);
return webClient.get()
.uri("/vehicles")
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.flatMapMany(response -> response.bodyToFlux(Vehicle.class))
.filter(v -> v.getSpeed() > 120)
.delayElements(Duration.ofMillis(250))
.subscribe(s -> {
System.out.println(counter.incrementAndGet() + " >>>>>>>>>> " + s);
},
err -> System.out.println("Error on Vehicle Stream: " + err),
() -> System.out.println("Vehicle stream stoped!"));
}
So, that way, we can catch every vehicle that is above the normal speed at this point of the highway. The point is filtering what you really need to process while receiving the continuous flow of the data stream.
Here are the results (or part of them) that we get in our WebClient subscription to this WebFlux endpoint data stream:
********************************************************************************
__ ___ __ _____
/ / / (_)___ _/ /_ _ ______ ___ __ / ___/___ ______ _____ _____
/ /_/ / / __ `/ __ \ | /| / / __ `/ / / / \__ \/ _ \/ ___/ | / / _ \/ ___/
/ __ / / /_/ / / / / |/ |/ / /_/ / /_/ / ___/ / __/ / | |/ / __/ /
/_/ /_/_/\__, /_/ /_/|__/|__/\__,_/\__, / /____/\___/_/ |___/\___/_/
/____/ /____/
********************************************************************************
2018-04-22 12:43:57 [main] INFO o.s.b.w.r.c.AnnotationConfigReactiveWebServerApplicationContext - Refreshing org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@335b5620
2018-04-22 12:44:02 [main] INFO o.s.b.w.e.netty.NettyWebServer - Netty started on port(s): 8080
13 >>>> Vehicle [carPlateNumber=FCK 8997, weight=1040, speed=137, color=Orange, modelYear=2014, gasType=Gasoline]
14 >>>> Vehicle [carPlateNumber=EWA 3102, weight=1438, speed=125, color=Silver, modelYear=1993, gasType=Gas]
15 >>>> Vehicle [carPlateNumber=DSR 1722, weight=327, speed=149, color=Black, modelYear=1976, gasType=Diesel]
16 >>>> Vehicle [carPlateNumber=VQQ 1013, weight=1344, speed=135, color=Orange, modelYear=1989, gasType=Gasoline]
17 >>>> Vehicle [carPlateNumber=MSZ 7605, weight=3990, speed=168, color=Orange, modelYear=2010, gasType=Gas]
18 >>>> Vehicle [carPlateNumber=QSL 5740, weight=2404, speed=144, color=Blue, modelYear=2014, gasType=Diesel]
19 >>>> Vehicle [carPlateNumber=FLJ 1025, weight=631, speed=119, color=Black, modelYear=2002, gasType=Gas]
20 >>>> Vehicle [carPlateNumber=AQQ 4943, weight=2104, speed=155, color=Black, modelYear=2007, gasType=Diesel]
21 >>>> Vehicle [carPlateNumber=SRW 9566, weight=3428, speed=134, color=Orange, modelYear=2008, gasType=Eletric]
...
Conclusion
In this article, we have taken a peek at reactive programming. We used Spring WebFlux, a Spring framework for reactive system web development, and got a look at a sample.
The fact that this model has a non-blocking execution flow makes us think a little bit different. It is not the same as the blocking execution model, where every call is blocked until the response is presented. This is the same paradigm that newcomers face when developing solutions in Node.js, for instance. It is important to be smart in your choice, however. If your problem doesn't fit in a non-blocking model programming, don't use it just because it is fancy.
As always, focus on the problem you are trying to solve, and frequently recall that main objective when the things get fuzzier.
The source code of the sample application is available at GitHub.
Published at DZone with permission of Ualter Junior, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments