Using rx-java Observable in a Spring MVC Flow
Join the DZone community and get the full member experience.
Join For FreeSpring MVC has supported asynchronous request processing flow for sometime now and this support internally utilizes the Servlet 3 async support of containers like Tomcat/Jetty.
Spring Web Async support
Consider a service call that takes a little while to process, simulated with a delay:
public CompletableFuture<Message> getAMessageFuture() { return CompletableFuture.supplyAsync(() -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); logger.info("End: Executing slow task in Service 1"); return new Message("data 1"); }, futureExecutor); }
If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:
@RequestMapping("/getAMessageFutureBlocking") public Message getAMessageFutureBlocking() throws Exception { return service1.getAMessageFuture().get(); }
A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:
@RequestMapping("/getAMessageFutureAsync") public DeferredResult<Message> getAMessageFutureAsync() { DeferredResult<Message> deffered = new DeferredResult<>(90000); CompletableFuture<Message> f = this.service1.getAMessageFuture(); f.whenComplete((res, ex) -> { if (ex != null) { deffered.setErrorResult(ex); } else { deffered.setResult(res); } }); return deffered; }
Using Observable in a Async Flow
Now to the topic of this article, I have been using Rx-java's excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.
Consider the service that was described above now modified to return an Observable:
public Observable<Message> getAMessageObs() { return Observable.<Message>create(s -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); s.onNext(new Message("data 1")); logger.info("End: Executing slow task in Service 1"); s.onCompleted(); }).subscribeOn(Schedulers.from(customObservableExecutor)); }
I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:
@RequestMapping("/getAMessageObsBlocking") public Message getAMessageObsBlocking() { return service1.getAMessageObs().toBlocking().first(); }
To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring's DeferredResult type:
@RequestMapping("/getAMessageObsAsync") public DeferredResult<Message> getAMessageAsync() { Observable<Message> o = this.service1.getAMessageObs(); DeferredResult<Message> deffered = new DeferredResult<>(90000); o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e)); return deffered; }
If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.
References:
Spring's reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-asyncMore details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog - http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments