Asynchronous HTTP Requests With RxJava
In this article, readers/developers will follow guide code to call two API’s and learn how to send long blocking requests asynchronously with RxJava and Vertx.
Join the DZone community and get the full member experience.
Join For FreeLet’s say we develop a service that has to interact with other components. Unfortunately, those components are slow and blocking.
It may be a legacy service that is very slow or some blocking API that we must use. Regardless, we have no control over it. In this post, we will call two APIs. One of them will block for two seconds and another for five seconds.
We also need to print response status codes once both responses are available. If we do it in the old fashion, non-reactive way, we would block a calling thread for five seconds. Holding the thread for five seconds is not efficient, is it?
Services
I used “httpstat.us” as a web service. This is a simple service for generating different HTTP codes to test web clients. It is possible to provide extra parameters, in this case, sleep
, that blocks HTTP requests for a provided amount of time.
I will be using “httpie” to test both services.
Service 1 will block for five seconds and return a response with status code 200:
http://httpstat.us/200?sleep=5000
_____________________________________________
HTTP/1.1 200 OK
Content-Length: 6
Content-Type: text/plain
Date: Tue, 08 Mar 2022 17:05:08 GMT
Request-Context: appId=cid-v1:1e93d241-20e4-4513-bbd7-f452a16a5d69
Server: Kestrel
Set-Cookie: ARRAffinity=e2c17206c539113795daf64bd958d003f2b29b9f62da53617beea05468875ba5;Path=/;HttpOnly;Domain=httpstat.us
200 OK
Service 2 is identical to the previous one except it blocks for two seconds instead of five:
http://httpstat.us/200?sleep=2000
_____________________________________________
HTTP/1.1 200 OK
Content-Length: 6
Content-Type: text/plain
Date: Tue, 08 Mar 2022 17:11:53 GMT
Request-Context: appId=cid-v1:1e93d241-20e4-4513-bbd7-f452a16a5d69
Server: Kestrel
Set-Cookie: ARRAffinity=e2c17206c539113795daf64bd958d003f2b29b9f62da53617beea05468875ba5;Path=/;HttpOnly;Domain=httpstat.us
200 OK
Web Client
We have learned about services. Now, let’s discuss web client. In this section, I used the Vert.x web client. It is an asynchronous, easy-to-use HTTP
and HTTP/2
client that supports RxJava too.
private static Single<Integer> service1(WebClient webClient) {
return webClient.getAbs("http://httpstat.us/200?sleep=5000")
.rxSend()
.doOnSuccess(response -> out.println("[" + Thread.currentThread().getName() + "] service 1: response received"))
.map(HttpResponse::statusCode);
}
private static Single<Integer> service2(WebClient webClient) {
return webClient.getAbs("http://httpstat.us/200?sleep=2000")
.rxSend()
.doOnSuccess(response -> out.println("[" + Thread.currentThread().getName() + "] service 2 response received"))
.map(HttpResponse::statusCode);
}
Both methods are very similar. They take WebClient
as a parameter and send HTTP requests returning Single<Integer>
. Where the integer is an HTTP response code. Returning RxJava Single
assures us that the result is asynchronous. The status code will be accessible later when it is available. This also gives us lazy evaluation, meaning services will get invoked only if an active subscription is present.
Consuming Single Sources
There are two sources that we will need to subscribe to. RxJava has a convenient method to combine Single
sources together. We can invoke method .zipWith
on the first source and supply two parameters. The first is the source to zip with and the second one is a function to consume both results, process them, and return something else.
In this case, the return type is AbstractMap.SimpleEntry<Integer, Integer>
, which is a simple tuple of two integers. Looks verbose, doesn’t it? Unfortunately, there are no better tuple or pair implementations in core Java libraries.
Thanks to Java Lambdas, we can pass the behavior as a parameter:
Single<Integer> service1Code = service1(webClient);
Single<Integer> service2Code = service2(webClient);
Single<AbstractMap.SimpleEntry<Integer, Integer>> tupleSource =
service1Code.zipWith(service2Code, (s1, s2) -> new AbstractMap.SimpleEntry<>(s1, s2));
Note: You may implement your own tuple or pair if an AbstractMap.SimpleEntry
feels too verbose.
All Together
Finally, we can put all the bits and peaces together as shown below:
// Vertx instance and web client
Vertx vertx = Vertx.vertx();
WebClient webClient = WebClient.create(vertx);
// single sources. Lazy evaluation, no invocation at this point
Single<Integer> service1Code = service1(webClient);
Single<Integer> service2Code = service2(webClient);
// combine results together and create tuple
Single<AbstractMap.SimpleEntry<Integer, Integer>> tupleSource =
service1Code.zipWith(service2Code, (s1, s2) -> new AbstractMap.SimpleEntry<>(s1, s2));
// subscribe and invoke services
tupleSource
.doFinally(countDownLatch::countDown)
.subscribe(Services::printResult);
Here are the results printed on the console after running the code. Both requests were dispatched from the same vertx
event loop thread. The program also prints messages that the thread is not blocked every second. Finally, it prints both status codes as a final result. As you can see everything happened on the same thread:
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] service 2 response received
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] service 1: response received
[vert.x-eventloop-thread-1] Result: service1:200 service2:200
Conclusion
This is all we are getting into at the moment. I hope this article was informative and you now have a better understanding of how to send long blocking requests asynchronously with RxJava and Vertx. If you’re interested, the complete code used in this article is linked here.
Happy reactive coding!
Published at DZone with permission of Andrius Kaliacius. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments