The Good, the Bad, and the Ugly: Propagating Data Through Reactive Streams
Propagating data through reactive pipelines is a very common concern when building reactive applications.
Join the DZone community and get the full member experience.
Join For FreePropagating data through reactive pipelines is a very common development concern that arises when building reactive applications based on any Reactive Streams implementation (e.g. Project Reactor, RxJava, and Akka Streams).
You may also like: 5 Things to Know About Reactive Programming
We’ll be going through the Good, the Bad and the Ugly of propagating information downstream, using Project Reactor as our Reactive Streams implementation of choice.
NOTE: If you’re quite familiar with Project Reactor and reactive programming already, you can jump to my demo Spring Boot application on GitHub and dig through the source code; it’s quite straightforward!
The Bad
One of the most common solutions employed to solve the data propagation issue is the usage of local (effectively final) variables, which can either be used immediately in the scope of the current method or passed on as extra parameters to other methods.
The pros:
- Quick and dirty;
- That’s it…
The cons:
- Encourages you to build longer methods in order to re-use the same local variable in multiple pipeline steps;
- Alternatively, it pollutes your API by adding extra method parameters whenever you need to refactor the code into smaller pieces;
- Your code becomes hard to maintain very quickly.
Check out the following example.
Controller
snippet:
@Autowired
private PrefixingService service;
@GetMapping(value = "/localvar/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<ResponseEntity<Flux<String>>> getMultiplesWithLocalVarPrefix(
@PathVariable int base,
@RequestParam int multiplier,
Authentication authentication) {
// saving the prefix as a local variable
String prefix = DATA_REQUESTED_BY + authentication.getName();
Flux<String> body =
// passing the prefix onto the next method, polluting its API
service.doPrefix(prefix, getMultiplierFlux(base, multiplier))
.onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG);
return Mono.just(ResponseEntity.ok().body(body));
}
private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
return Flux
.<Integer>create(sink -> {
if(base < 0 || multiplier < 0)
sink.error(new IllegalArgumentException());
for(int i = 1; i <= 10; i++)
sink.next(base * multiplier * i);
sink.complete();
})
.delayElements(ofSeconds(1));
}
Furthermore, here is a polluted service snippet:
// Data propagation via local variable polluted the method signature!
public Flux<String> doPrefix(String prefix, Flux<Integer> toPrefix) {
return toPrefix
.map(data -> prefix + " " + data);
}
The Ugly
Another prevalent solution consists of the usage of Tuples, which aggregate multiple pieces of data together into a single object that gets propagated downstream and allows specific access to any and every component.
The pros:
- No extra parameters are needed to propagate data downstream
- No need to create our own aggregator POJOs
- Good for propagating mutable data downstream
- Tuples are a Project Reactor component; therefore, we must be doing things the Reactor way, am I right?
The cons:
- Methods signatures become quite long and are filled with generics declarations
- The code required to handle tuples is definitely ugly
- The code becomes hard to read at first glance.
Let's look at the following controller snippet:
@Autowired
private PrefixingService service;
@GetMapping(value = "/tuples/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<ResponseEntity<Flux<String>>> getMultiplesWithTuplesPrefix(
@PathVariable int base,
@RequestParam int multiplier) {
Flux<String> body =
service.doPrefixWithTuple(
getMultiplierFlux(base, multiplier)
.map(multiple -> Tuples.of(multiple, ReactiveSecurityContextHolder.getContext()))
)
.onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG);
return Mono.just(ResponseEntity.ok().body(body));
}
private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
return Flux
.<Integer>create(sink -> {
if(base < 0 || multiplier < 0)
sink.error(new IllegalArgumentException());
for(int i = 1; i <= 10; i++)
sink.next(base * multiplier * i);
sink.complete();
})
.delayElements(ofSeconds(1));
}
// Data propagation via Tuples alters the method signature and makes the code ugly really quickly!
public Flux<String> doPrefixWithTuple(Flux<Tuple2<Integer, Mono<SecurityContext>>> toPrefix) {
return toPrefix
.flatMap(tuple -> tuple.getT2().map(securityContext -> Tuples.of(tuple.getT1(), securityContext.getAuthentication().getName())))
.map(tuple -> DATA_REQUESTED_BY + tuple.getT2() + " " + tuple.getT1());
}
The Good
A way less common and often unknown solution consists of the usage of the Project Reactor’s context, a map-like structure that is automatically and transparently propagated throughout the whole reactive pipeline and can be easily accessed at any moment by calling the Mono.subscriberContext()
static method.
The context can be populated at subscription time by adding either the subscriberContext(Function)
or the subscriberContext(Context)
method invocation at the end of your reactive pipeline.
It is an excellent solution for propagating static, technical data about the current process and dealing with cross-cutting concerns. Therefore, it should be used for things such as propagation of authentication contexts, static logging information, correlation ids, and transaction contexts.
The pros:
- There are no extra parameters needed to propagate data downstream
- The methods’ signatures are completely unscathed
- A very elegant solution for dealing with cross-cutting concerns
- We are still able to do things the Reactor way.
The cons:
- It's not the best tool for the propagation of functional, highly mutable data;
- It is a bit verbose when compared to alternatives.
Here's an example of a controller snippet:
@Autowired
private PrefixingService service;
@GetMapping(value = "/{base}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<ResponseEntity<Flux<String>>> getMultiples(
@PathVariable int base,
@RequestParam int multiplier) {
Flux<String> body =
service.doPrefix(getMultiplierFlux(base, multiplier))
.onErrorReturn(IllegalArgumentException.class, ILLEGAL_ARGUMENT_MSG)
.subscriberContext(ctx -> ctx.put(PREFIX_KEY, getPrefix(ctx)));
return Mono.just(ResponseEntity.ok().body(body));
}
private Mono<String> getPrefix(Context ctx) {
return ctx.getOrDefault(SecurityContext.class, Mono.just(new SecurityContextImpl()))
.map(securityCtx -> DATA_REQUESTED_BY + securityCtx.getAuthentication().getName());
}
private Flux<Integer> getMultiplierFlux(int base, int multiplier) {
return Flux
.<Integer>create(sink -> {
if(base < 0 || multiplier < 0)
sink.error(new IllegalArgumentException());
for(int i = 1; i <= 10; i++)
sink.next(base * multiplier * i);
sink.complete();
})
.delayElements(ofSeconds(1));
}
NOTE: In the snippet above, the Spring Security authentication context is retrieved from Project Reactor’s context since the latter is already filled with it by the Spring WebFlux Security module. Neat!
Here is an example of a clean service snippet:
// Data propagation via Reactor Context is transparent, the method signature is unscathed!
public Flux<String> doPrefix(Flux<Integer> toPrefix) {
return toPrefix
.flatMap(data ->
Mono.subscriberContext()
.flatMap(ctx -> ctx.getOrDefault(PREFIX_KEY, Mono.just("")))
.map(prefix -> prefix + " " + data)
);
}
Wrapping Up
We went through three instances of the same — simple but definitely overcomplicated — example showing different approaches to data propagation in reactive pipelines.
There is no clear and absolute winner for every use case, but Project Reactor’s context certainly deserves an honorable mention for dealing with cross-cutting concerns in a way that’s both elegant and transparent.
References
For more detailed information about Project Reactor’s context, you can refer to the official Project Reactor documentation.
If you want to give a deeper look at some of the code shown above or try it out yourself, feel free to check out my demo project on GitHub.
Further Reading
5 Things to Know About Reactive Programming
Published at DZone with permission of Domenico Sibilio. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments