Separation of Reactive and Non-Reactive Code
Avoid unexpected behavior caused by mixing reactive and non-reactive logic when using Project Reactor.
Join the DZone community and get the full member experience.
Join For FreeOne of the most important distinctions to keep in mind when working with Project Reactor or any other reactive streams implementation is the difference between assembly-time vs. subscription-time in code execution.
Regardless of your level of experience with reactive programming, odds are you’ve already encountered the famous:
Nothing happens until you subscribe
In other words, the reactive publishers (Flux
and Mono
) are lazy, so no elements will be published or processed until someone subscribes. Understanding this distinction is essential because, when writing real-life applications, we want all (or most) of our business logic to be executed at subscription time. In the post, we will show what kind of issues can arise from not following this rule and how to mitigate them.
Car Rental Service Example
To exemplify this we will use a very simple implementation of a fictitious car rental service. The service accepts the input consisting of a customer’s name, age, and email address along with a car model. It first checks whether the customer is older than 18 (and thus legally allowed to rent a car), after which it saves the rental request to a database and finally generates a PDF receipt and emails it to the customer.
This flow is implemented by the rentCar
method:
private static Mono<UUID> rentCar(CarRentalRequest request) {
if (request.getCustomerAge() > 18) {
UUID rentalId = UUID.randomUUID(); // Generate an ID for the new rental
return saveCarRental(rentalId, request) // Save the rental entity to the database
.then(buildAndSendPdfReceipt(rentalId, request)) // Generate and send PDF report
.then(Mono.just(rentalId)); // Return the ID of the new rental
} else {
return Mono.error(new RuntimeException("Must be 18 to rent a car"));
}
}
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}
We can then call this method to create the publisher. Additionally, we want to make sure that we delegate the work to a separate scheduler such that the main thread can proceed to handling other requests. We can accomplish this using the subscribeOn
operator (which changes the execution context throughout the entire pipeline, both above and below, so the top publisher will produce the elements on the Scheduler
set by this operator). Finally, we provide a subscriber which defines the logic to be executed for a success as well as an error response (the two lambda arguments in the subscribe()
method, respectively).
CarRentalRequest request = new CarRentalRequest("Alice", 30, "Hyundai i30", "alice@mail.com");
rentCar(request)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(s -> log.info("Car rented successfully, rental ID: {}", s),
e -> log.error("Could not rent car: {}", e.getMessage(), e));
With this implementation in mind, let’s take a closer look at the first issue.
Pitfall 1: Incorrect Execution Context
By looking closely at the buildAndSendPdfReceipt
method one can easily guess that the buildPdfReceipt
is a synchronous, non-reactive method: it doesn’t return any reactive type, it simply returns a byte[]
representing the PDF document. This method could look something like
private static byte[] buildPdfReceipt(UUID rentalId, CarRentalRequest request) {
log.info("Build PDF receipt");
// Create and return the PDF receipt document
...
}
However, if we run this example, we get the following output:
21:25:38.961 [main] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:25:38.986 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: d5b689dd-fa91-486c-b835-44bc2583d53a
If we pay attention to the section of the logs showing the current thread for each statement (in square brackets), we notice that the subscriber logic is correctly executed on a thread in the bounded elastic scheduler - boundedElastic-1
. However, the work of creating the PDF seems to be executed on the main
thread! So why is this the case?
The answer lies in the abovementioned distinction between assembly and subscription. Let’s have another look at the buildAndSendPdfReceipt
method:
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}
When this method is executed, we are merely supposed to assemble the reactive pipeline, i.e. to declaratively define the steps to be executed in order to create the PDF report. At this stage, we are not supposed to do the actual work of generating this report, which is only meant to happen when someone subscribes to this publisher. That is unfortunately not the case here - the call to buildPdfReceipt
is made in the body of the method, along with the rest of the assembly code. One of the very unfortunate consequences of this is the incorrect execution context that we saw above. The overall pipeline is assembled on the main
thread while the published elements are processed on the boundedElastic
scheduler. But since the call to buildPdfReceipt
is made at assembly time, this presumably time-consuming operation will now take place on the main
thread as well. This can be dangerous since in many real-life scenarios we tend to have more threads/resources dedicated to processing reactive pipelines than assembling them, and keeping the assembly thread busy can have a negative impact on the overall throughput and performance of our application.
One way to fix this problem is by using the fromCallable
method as below:
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
return Mono.fromCallable(() -> buildPdfReceipt(rentalId, carRentalRequest))
.flatMap(pdfReceipt -> sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail()));
}
As we know, the publisher will only start producing the elements when someone subscribes (i.e. at subscription time), so the call to buildPdfReceipt
is now made as part of the overall pipeline, on the desired scheduler. Indeed, running the application again produces the following result:
21:54:49.955 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:54:49.956 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: a3bb873e-4943-407a-967f-9fa1c1d0d235
In many complex real-life applications, this kind of issue can be difficult to spot. One good way to avoid them is to make sure that reactive methods (i.e. methods assembling pipelines, which usually have a reactive return type) do not directly make calls to non-reactive methods. Rather they should just assemble the reactive pipelines, preferably in a single fluent statement, and all calls to non-reactive methods should be made from within the reactive operators (fromCallable
, fromRunnable
, map
, filter
, etc.).
Pitfall 2: Incorrect Exception Handling
When designing and implementing any kind of application, we always want to make sure that we can handle errors gracefully, by either trying to recover or otherwise presenting the user with a proper error message. In our simple car rental service, we create a subscriber with an error handler lambda that logs the error from upstream. The expectation is that any error that might occur anywhere in the pipeline will result in a log statement describing the problem.
To test this, let’s consider the following input:
CarRentalRequest request = new CarRentalRequest("Bob", null, "Hyundai i30", "bob@mail.com")
Notice that in this case the age of the customer is incorrectly set to null
. Even so, we would expect that any error this might cause will be correctly intercepted and logged. Unfortunately, running this code now produces the following output:
Exception in thread "main" java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
at com.reactordemo.carrental.CarRentalService.rentCar(CarRentalService.java:27)
at com.reactordemo.carrental.CarRentalService.entryPoint(CarRentalService.java:19)
at com.reactordemo.carrental.ReactorDemoApplication.main(ReactorDemoApplication.java:10)
This shows that our invalid input produced an NPE which was not caught anywhere. But why? Why wasn’t our error handler invoked for this exception? To understand this, let’s have another look at our main reactive pipeline:
private static Mono<UUID> rentCar(CarRentalRequest request) {
if (request.getCustomerAge() > 18) {
UUID rentalId = UUID.randomUUID();
return saveCarRental(rentalId, request)
.then(buildAndSendPdfReceipt(rentalId, request))
.then(Mono.just(rentalId));
} else {
return Mono.error(new RuntimeException("Must be 18 to rent a car"));
}
}
It is clear that the exception occurs in the if
statement’s condition, where we check whether the age is greater than 18. But note that this check does not happen properly as part of the pipeline execution. Instead, the check is made as part of assembling the pipeline. Therefore, any error that happens here will not be regarded as a failure to process an element in the pipeline, but rather a failure to assemble the pipeline, to begin with. Once again, this issue could have been avoided by simply defining all logic specific to element processing (including the check) inside the reactive pipeline.
private static Mono<UUID> rentCar(CarRentalRequest request) {
return Mono.just(request)
.<CarRentalRequest>handle((req, sink) -> {
if (req.getCustomerAge() > 18) {
sink.next(req);
} else {
sink.error(new RuntimeException("Must be 18 to rent a car"));
}
})
.flatMap(req -> {
UUID rentalId = UUID.randomUUID();
return saveCarRental(rentalId, req)
.then(buildAndSendPdfReceipt(rentalId, req))
.then(Mono.just(rentalId));
});
}
In the initial implementation, there were two pieces of functionality related to processing the request that was being executed at assembly time: the age check and the ID generation. We have now moved both of them inside the pipeline, in the handle
and flatMap
operators, respectively. After applying this fix, the execution produces the following output:
12:48:46.627 [boundedElastic-1] ERROR com.reactordemo.carrental.CarRentalService - Could not rent car: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
at com.reactordemo.carrental.CarRentalService.lambda$rentCarFixed$2(CarRentalService.java:40)
Of course, it is not ideal that an NPE is thrown instead of the input being validated and a more meaningful error being produced. Still, we can see that the exception is now thrown at subscription-time inside the pipeline, which means that it will eventually be caught by our error handler, as expected.
Conclusion
In this post, we have analyzed two scenarios where incorrect separation of assembly-time and subscription-time logic can lead to undesired behaviour in our application. To mitigate such issues and others, we propose a clear separation as follows:
- As part of reactive methods (methods that assemble reactive pipelines, i.e. have reactive return types), avoid performing tasks other than strictly building the pipeline.
- A good practice for such methods is to ensure that they only assemble and return the reactive pipeline, preferable in a single fluent-style statement.
- Any non-trivial logic (that most often has to do with element processing rather than assembling the pipeline) such as input validation or mapping, calls to other synchronous methods, etc. should be executed as part of the pipeline. This can be achieved using a plethora of operators, of which this post exemplifies a few such as
handle
,flatMap
,fromCallable
.
Opinions expressed by DZone contributors are their own.
Comments