Reactive Spring: Define a REST Endpoint as a Continuous Stream
See how to define a REST endpoint as a continuous stream.
Join the DZone community and get the full member experience.
Join For FreeIn REST APIs, all HTTP requests are stateless. We fire the request and get the response. That's it. It does not keep any state for any HTTP request. The connection between the client and server is lost once the transaction ends, so 1 response for 1 request.
But sometimes, we get the requirement to have a continuous response for a single request. This continuous response is called a Streaming response. So in this article, we will see how we can do that in Spring.
Here, I am assuming we already have a logic that is creating continuous output. I am just going to show how we can wrap the continuous output in a streaming response and send it to the client. So let's begin.
For the continuous output, we will be using the below piece of code:
Flux.interval(Duration.ofSeconds(1));
The above code will produce a sequential long value starting from 0 in the interval of 1 second infinitely.
To send the streaming response, we need to set the produced media type as follows:
MediaType.APPLICATION_STREAM_JSON_VALUE
The full code for GetMapping:
@GetMapping(value = “/streaming”, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
The full code for the controller:
@RestController
public class StreamingController {
@GetMapping(value = “/streaming”, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Long> getItemsStream(){
return Flux.interval(Duration.ofSeconds(1)); // You can write your own logic here.
}
}
That's it for the controller. It's very simple.
Now, let's write the test cases to test the controller behavior.
First, we need to add below dependencies in pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
Now, write the test file:
@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureWebTestClient
public class StreamingControllerTest {
@Autowired
WebTestClient webTestClient;
@Test
public void fluxStream() {
Flux<Long> longStreamFlux = webTestClient.get().uri(“/streaming”)
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.expectStatus().isOk()
.returnResult(Long.class)
.getResponseBody();
StepVerifier.create(longStreamFlux)
.expectNext(0l)
.expectNext(1l)
.expectNext(2l)
.thenCancel()
.verify();
}
}
Here, we are using WebTestClient to test the REST endpoint and StepVerifier to test the Flux output. You can see that we are expecting 3 elements, and then we have called cancel. As it is a streaming response, we do not get an onComplete event here, so we need to cancel the request explicitly to verify the behavior. You can get the full code here.
I hope this article helps you define the streaming endpoint for Spring. Thanks for reading!
This article was first published on the Knoldus blog.
Published at DZone with permission of Rishi Khandelwal, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments