Testing Spring Boot Apps With Kafka and Awaitility
Learn more about testing Spring Boot apps with Kafka and Awaitility!
Join the DZone community and get the full member experience.
Join For FreeThis is the second article in the Spring Cloud Stream and Kafka series. This post talks about different ways in which we can test Spring Boot applications using EmbeddedKafka and Awaitility.
While testing any synchronous application, it is all about “call and wait.” We invoke a particular API or endpoint and wait for the response. The test blocks the main execution thread until the API returns the response. Once the processing completes, we get the response and can compare the result with the expected output.
You may also like: Kafka With Spring Cloud Stream
Asynchronous applications are tested differently as compared to the synchronous or blocking applications, i.e. we need not block the main execution thread. In simple words, it will not wait for the response from the API and we manually need to program the test to hold the execution at a certain point and wait for the results from all the non-blocking operations. At this stage, we can write the assertions.
It is hard to manage different threads and concurrency issues and write a concise, readable unit test.
There are a few ways in which we can write tests for a Spring Boot — Spring Cloud Stream-based micro-services to connect with Kafka.
Let’s consider a simple use case for this purpose.
Example Use Case
There is a producer bean that will send messages to a Kafka topic.
package com.techwording.scs;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
Source.class) (
public class Producer {
private Source mySource;
public Producer(Source mySource) {
super();
this.mySource = mySource;
}
public Source getMysource() {
return mySource;
}
public void setMysource(Source mysource) {
mySource = mysource;
}
}
A consumer bean will listen to a Kafka topic and receive messages.
xxxxxxxxxx
Sink.class) (
public class Consumer {
private String receivedMessage;
(target = Sink.INPUT)
public void consume(String message) {
receivedMessage = message;
latch.countDown();
}
public String getReceivedMessage() {
return receivedMessage;
}
}
A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with spring-kafka-test
.
xxxxxxxxxx
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
EmbeddedKafkaRule
Spring-kafka-test
provides an embedded Kafka broker. We can use a JUnit @ClassRule
annotation to create this Kafka broker. This rule starts the Kafka and Zookeeper servers on a random port before the tests execute and shuts them down after the tests are complete. The embedded Kafka broker eliminates the need to have a real Kafka and Zookeeper instance running while running the test.
Coming back to the tests, I have implemented this test in two ways, using Awaitility and using a countdown latch.
Test Using Awaitility
here. Below is an implementation of the test using Awaitility.
This is a DSL library that provides features to assist in writing JUnit tests for an asynchronous Java application. You can check out their official GitHub page
xxxxxxxxxx
package com.techwording.scs;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.BDDAssertions.then;
import static org.awaitility.Awaitility.waitAtMost;
SpringRunner.class) (
classes = { EmbeddedKafkaAwaitilityTest.App.class, EmbeddedKafkaAwaitilityTest.Producer.class, EmbeddedKafkaAwaitilityTest.Consumer.class }) (
Source.class) (
public class EmbeddedKafkaAwaitilityTest {
(exclude = TestSupportBinderAutoConfiguration.class)
static class App {
}
private static final String TOPIC1 = "test-topic-1";
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers",
embeddedKafka.getEmbeddedKafka()
.getBrokersAsString());
System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
}
private Producer producer;
private Consumer consumer;
public void testMessageSendReceive_Awaitility() {
producer.getMysource()
.output()
.send(MessageBuilder.withPayload("payload")
.setHeader("type", "string")
.build());
waitAtMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
then("payload").isEqualTo(
EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
});
}
}
Test Using CountDownLatch
As per the Java documentation, CountDownLatch
is an aid that allows one or more threads to wait until a set of operations being performed in other threads completes. To write this test using CountDownLatch
, we initialize the latch first with a counter.
The value of this counter depends on the number of tasks our test needs to wait for. Here, we initialize this counter with count 1. Once the producer has sent the message, the latch waits for the count to reach 0. The consumer has the responsibility of decreasing the count. Hence, when the consumer is done with its part, the main thread resumes and performs the assertion.
Below is an implementation of the test using CountDownLatch
:
xxxxxxxxxx
package com.techwording.scs;
import java.util.concurrent.CountDownLatch;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
SpringRunner.class) (
classes = { EmbeddedKafkaLatchTest.App.class, EmbeddedKafkaLatchTest.Producer.class, EmbeddedKafkaLatchTest.Consumer.class }) (
Source.class) (
public class EmbeddedKafkaLatchTest {
(exclude = TestSupportBinderAutoConfiguration.class)
static class App {
}
private static final String TOPIC1 = "test-topic-1";
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
private static CountDownLatch latch = new CountDownLatch(1);
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers",
embeddedKafka.getEmbeddedKafka()
.getBrokersAsString());
System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
}
private Producer producer;
private Consumer consumer;
public void testMessageSendReceive() throws InterruptedException {
producer.getMysource()
.output()
.send(MessageBuilder.withPayload("payload")
.setHeader("type", "string")
.build());
latch.await();
assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
}
}
You can find the complete source code here.
Further Reading
Kafka With Spring Cloud Stream
Opinions expressed by DZone contributors are their own.
Comments