Spring 5 Web Reactive: Flux, Mono, and JUnit Testing
Join the DZone community and get the full member experience.
Join For FreeThe reactive-stack web framework, Spring WebFlux, has been added to Spring 5.0. It is fully non-blocking, supports reactive streams back-pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
Reactive processing is a paradigm that enables developers to build non-blocking, asynchronous applications that can handle back-pressure (flow control). Reactive systems better utilize modern processors. Also, the inclusion of back-pressure in reactive programming ensures better resilience between decoupled components.
Reactive systems have certain characteristics that make them ideal for low-latency, high-throughput workloads. Project Reactor and the Spring portfolio work together to enable developers to build enterprise-grade reactive systems that are responsive, resilient, elastic, and message-driven.
Flux and Mono:
Spring Webflux uses Project Reactor as reactive library. Spring WebFlux heavily uses two publishers:
- Mono: Returns 0 or 1 element.
- Flux: Returns 0…N elements.
Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back-pressure. Reactor has a strong focus on server-side Java. It is developed in close collaboration with Spring. WebFlux requires Reactor as a core dependency but it is interoperable with other reactive libraries via Reactive Streams.
Create a Maven project with the following dependencies
xxxxxxxxxx
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.0.1.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-test -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.0.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
Create a FluxTest test case using JUnit 5 with various test methods that helps to create and test Flux component.
xxxxxxxxxx
class FluxTest {
void testFlux1() {
//Create a Flux that completes without emitting any item.
Flux.empty();
}
void testFlux2() {
//Create a new Flux that will only emit a single element then onComplete.
Flux<String> flux= Flux.just("Spring 5");
flux.subscribe(System.out::println);
}
void testFlux3() {
//Create a Flux that emits the provided elements and then completes.
Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
flux.subscribe(System.out::println);
}
void testFlux4() {
//Create a Flux that emits the items contained in the provided array.
Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
flux.subscribe(System.out::println);
}
void testFlux5() {
//Create a Flux that emits the items contained in the provided Iterable.A new iterator will be created for each subscriber.
List<Integer> list = Arrays.asList(1,2,3,4,5);
Flux<Integer> flux=Flux.fromIterable(list);
flux.subscribe(System.out::println);
}
void testFlux6() {
//Concatenate emissions of this Flux with the provided Publisher (no interleave).
List<Integer> list = Arrays.asList(1,2,3,4,5);
Flux<Integer> flux=Flux.fromIterable(list)
.concatWith(Flux.just(6,7,8));
flux.subscribe(System.out::println);
}
void testFlux7() {
//Create a Flux that terminates with the specified error immediately afterbeing subscribed to.
Flux<String> flux= Flux.error(new RuntimeException("Error Occurred"));
//flux.subscribe(System.out::println);
}
}
Flux is a Reactive Stream Publisher with rx operators that emit 0 to N elements and then completes (successfully or with an error).
The just
method creates a new flux that emits the provided elements or a single element and then completes. The subscribe
method is used to subscribe a Consumer to the Flux that will consume all the elements in the sequence, as well as a Consumer that will handle errors. You can concatenate emissions of this Flux with the provided Publisher using the concatWith
method. On invoking the error
method, Flux is created that terminates with the specified error immediately after being subscribed to.
Create FluxTestUsingStepVerifier test case that helps to test Flux component in different ways
xxxxxxxxxx
class FluxTestUsingStepVerifier {
void testFlux1() {
//create-Prepare a new StepVerifier in an uncontrolled environment: Step.thenAwait will block in real time.Each verify() will fully (re)play the scenario.
//expectNext - Expect the next element received to be equal to the given value.
//verfyComplete - Trigger the verification, expecting a completion signalas terminal event.
Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
StepVerifier.create(flux.log()).expectNext("Spring MVC")
.expectNext("Spring Boot").expectNext("Spring Web")
.verifyComplete();
}
void testFlux2() {
//expectNextCount-Expect to received count elements, starting from the previousexpectation or onSubscribe.
Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
StepVerifier.create(flux.log()).expectNextCount(3)
.verifyComplete();
}
void testFlux3() {
//expectError -Expect an error of the specified type.
//verify -Verify the signals received by this subscriber.
Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")));
StepVerifier.create(flux.log())
.expectNext("Spring MVC")
.expectNext("Spring Boot")
.expectNext("Spring Web")
.expectError(RuntimeException.class)
.verify();
}
}
A StepVerifier
provides a declarative way of creating a verifiable script for an async Publisher sequence by expressing expectations about the events that will happen upon subscription.
The verification must be triggered after the terminal expectations (completion, error, cancellation) has been declared, by calling one of the verify()
methods.
StepVerifier can be created around a Publisher using create(Publisher)
or withVirtualTime(Supplier<Publisher)
. Set up individual value expectations using expectNext
, expectNextMatches(Predicate)
, assertNext(Consumer)
, expectNextCount(long)
, or expectNextSequence(Iterable)
.
Trigger subscription actions during the verification using either thenRequest(long)
or thenCancel()
. Finalize the test scenario using a terminal expectation: expectComplete()
, expectError()
, expectError(Class)
, expectErrorMatches(Predicate)
, or thenCancel()
.
Create a test case called MonoTest that helps to create and test the Mono component in different ways
xxxxxxxxxx
class MonoTest {
public void testMono1() {
Mono<String> mono=Mono.empty();
}
public void testMono2() {
Mono<String> mono=Mono.just("Spring");
mono.subscribe(System.out::println);
}
public void testMono3() {
Mono<Integer> mono=Mono.just(10);
mono.subscribe(System.out::println);
}
public void testMono4() {
Mono<String> mono=Mono.error(new RuntimeException("Exception occurred"));
//mono.subscribe(System.out::println);
}
public void testMono5() {
Mono<String> mono=Mono.just("Spring");
StepVerifier.create(mono.log())
.expectNext("Spring")
.verifyComplete();
}
public void testMono6() {
Mono<String> mono=Mono.error(new RuntimeException("Exception occurred"));
StepVerifier.create(mono.log())
.expectError(RuntimeException.class)
.verify();
//Another approach
StepVerifier.create(Mono.error(new RuntimeException("Exception")).log())
.expectError(RuntimeException.class)
.verify();
}
}
Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
The verifyComplete()
method of StepVerifier
triggers the verification, expecting a completion signal as terminal event. The expectError(class)
method of StepVerifier
expects an error of the specified type.
Let's explore some factory methods of Flux/Mono that help to filter, transform, and combine the Publisher stream
Filter the source against the given predicate using filter method. Transform the elements emitted by the Flux using map and flatmap methods. Combine the reactive streams using the following methods
- concat.
- merge.
- zip.
Handle errors in the reactive streams using
- doOnError.
- onErrorReturn.
Creating infinite reactive streams using interval method. Create another test case to implement filter method.
xxxxxxxxxx
class FilterReactiveStreamTest {
List<String> cities = Arrays.asList("Chennai","Pune","Mumbai","Kolkatta");
void filterTest1() {
Flux<String> cityFlux = Flux.fromIterable(cities);
Flux<String> filteredCityFlux=cityFlux.filter(city->city.length()>7);
StepVerifier.create(filteredCityFlux.log())
.expectNext("Kolkatta")
.verifyComplete();
}
void filterTest2() {
Flux<String> cityFlux = Flux.fromIterable(cities);
Flux<String> filteredCityPFlux=cityFlux.filter(city->city.startsWith("P"));
StepVerifier.create(filteredCityPFlux.log())
.expectNext("Pune")
.verifyComplete();
}
void filterTest3() {
Flux<String> cityFlux = Flux.fromIterable(cities);
Flux<String> filteredCityPFlux=cityFlux.filter(city->city.contentEquals("Mumbai"));
StepVerifier.create(filteredCityPFlux.log())
.expectNext("Mumbai")
.verifyComplete();
}
void filterTest4() {
Flux<String> cityFlux = Flux.fromIterable(cities);
Flux<String> filteredCityPFlux=cityFlux.filter(city->city.endsWith("i"));
StepVerifier.create(filteredCityPFlux.log())
.expectNextCount(2)
.verifyComplete();
}
void filterTest5() {
Flux<String> cityFlux = Flux.fromIterable(cities);
Flux<String> filteredCityPFlux=cityFlux.filter(city->city.isEmpty());
StepVerifier.create(filteredCityPFlux.log())
.expectNext()
.verifyComplete();
}
}
Filter evaluates each source value against the given Predicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
Create a test case to transform a reactive stream using map
xxxxxxxxxx
class MapReactiveStreamTest {
void mapTest1() {
Flux.range(1, 5).map(data->data*data)
.subscribe(System.out::println);
Flux.range(1, 5).map(data->data*data)
.subscribe(data->System.out.println(data));
}
void mapTest2() {
Flux.range(1, 5).map(data->data.toString()+"Hello").subscribe(System.out::println);
}
void mapTest3() {
Flux.range(1, 10).map(data->data*data).filter(data->data%2==0).subscribe(System.out::println);
}
void mapTest4() {
Flux<Integer> flux=Flux.just(1,2,3,4,5);
flux.map(data->data+2).subscribe(System.out::println);
}
void mapTest5() {
Flux<String> flux = Flux.just("Tom", "Jerry");
flux = flux.map(String::toUpperCase);
StepVerifier.create(flux)
.expectNext("TOM", "JERRY")
.verifyComplete();
}
}
Create a test case to transform a reactive stream using flatmap:
xxxxxxxxxx
class FlatmapReactiveStreamTest {
//Mock DB or external service
private Mono<String> getEmpDetails(String id) {
Map<String,String> map = new HashMap<>();
map.put("1", "Joe");
map.put("2", "Alex");
map.put("3", "Marty");
map.put("4", "Glory");
map.put("5", "Ajay");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return Mono.just(
map.getOrDefault(id, "NotFound"));
}
void test() {
List<String> listEmpId = Arrays.asList("1","2","3","4","5");
//Transform the elements emitted by this Flux asynchronously into Publishers,then flatten these inner publishers into a single Flux through merging,which allow them to interleave.
//DB or external service call that return a flux or mono
Flux<String> flux=Flux.fromIterable(listEmpId)
.flatMap(id->getEmpDetails(id))
.log();
StepVerifier.create(flux)
.expectNextCount(5)
.verifyComplete();
}
}
Create a test case to combine the reactive stream using concat, merge, and zip.
xxxxxxxxxx
class CombineReactiveStreamTest {
//Combine Using merge
public void mergeTest() {
Flux<String> f1 = Flux.just("A","B","C");
Flux<String> f2 = Flux.just("X","Y","Z");
Flux<String> combFlux = Flux.merge(f1,f2);
StepVerifier.create(combFlux.log())
.expectNext("A","B","C","X","Y","Z")
.verifyComplete();
}
public void mergewithdelayTest() {//it takes 3 seconds
Flux<String> f1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
Flux<String> f2 = Flux.just("X","Y","Z").delayElements(Duration.ofSeconds(1));
Flux<String> combFlux = Flux.merge(f1,f2);
StepVerifier.create(combFlux.log())
.expectNextCount(6)
.verifyComplete();
}
//Combine using Concat
public void combineWithConcatTest1() {
Flux<String> f1 = Flux.just("A","B","C");
Flux<String> f2 = Flux.just("X","Y","Z");
Flux<String> combFlux = Flux.concat(f1,f2);
StepVerifier.create(combFlux.log())
.expectNext("A","B","C","X","Y","Z")
.verifyComplete();
}
public void combineWithConcatTest2() {
Flux<String> f1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
Flux<String> f2 = Flux.just("X","Y","Z").delayElements(Duration.ofSeconds(1));
Flux<String> combFlux = Flux.concat(f1,f2);
StepVerifier.create(combFlux.log())
.expectNext("A","B","C","X","Y","Z")
.verifyComplete();
}
//Combine using zip
public void combineWithZip() {
Flux<String> f1 = Flux.just("A","B","C");
Flux<String> f2 = Flux.just("X","Y","Z");
Flux<Tuple2<String, String>> zip=Flux.zip(f1, f2);
StepVerifier.create(zip.log())
.expectNextCount(3)
.verifyComplete();
}
public void combineWithZipWith() {
Flux<String> f1 = Flux.just("A","B","C");
Flux<String> f2 = Flux.just("X","Y","Z");
Flux<Tuple2<String, String>> zip=f1.zipWith(f2);
StepVerifier.create(zip.log())
.expectNextCount(3)
.verifyComplete();
}
}
Create a test to handle the errors in reactive stream
xxxxxxxxxx
class ErrorHandlingTest {
void testError1() {
Flux<String> f1= Flux.just("A","B","C")
.concatWith(Flux.error(new RuntimeException("Some Error")))
.concatWith(Flux.just("D"));
StepVerifier.create(f1.log())
.expectNext("A","B","C")
.expectError()
.verify();
}
void doOnErrorTest() {
Flux<String> f1= Flux.just("A","B","C")
.concatWith(Flux.error(new RuntimeException("Some Error")))
.doOnError((err)->System.out.println("Some error occurred "+err));
StepVerifier.create(f1.log())
.expectNextCount(3)
.expectError()
.verify();
}
public void onErrorReturn() {
Flux<String> f1= Flux.just("A","B","C")
.concatWith(Flux.error(new RuntimeException("Some Error")))
.onErrorReturn("default value");
StepVerifier.create(f1.log())
.expectNextCount(3)
.expectNext("some default value")
.verifyComplete();
}
}
doOnError
Add behavior triggered when the Flux completes with an error matching the given exception type.
xxxxxxxxxx
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Type Parameters: E — the type of the error to handle
Parameters: exceptionType
, the type of exceptions to handle, onError
, the error handler for each error.
Returns: an observed Flux
onErrorReturn
Simply emit a captured fallback value when an error is observed on this Flux.
xxxxxxxxxx
public final Flux<T> onErrorReturn(T fallbackValue)
Parameters: fallbackValue
, the value to emit if an error occurs
Returns: a new falling back Flux.
Opinions expressed by DZone contributors are their own.
Comments