Integration Testing of Non-Blocking Retries With Spring Kafka
How to write integration tests for your Spring Kafka implementation of consumers having retries and Dead Letter Publishing enabled.
Join the DZone community and get the full member experience.
Join For FreeKafka Non-Blocking Retries
Non Blocking retries in Kafka are done via configuring retry topics for the main topic. An Additional Dead Letter Topic can also be configured if required. Events will be forwarded to DLT if all retries are exhausted. A lot of resources are available in the public domain to understand the technicalities.
What To Test?
It can be a challenging job when it comes to writing integration tests for the retry mechanism in your code.
- How do you test that the event has been retried for the required number of times?
- How do you test that retries are only performed when certain exceptions occur and not for others?
- How do you test if another retry is not done if the exception is resolved in the previous retry?
- How do you test that the nth attempt in the retry succeeds after (n-1) retry attempts have failed?
- How to test if the event has been sent to the Dead Letter Queue when all the retry attempts have been exhausted?
Let’s see with some code. You can find a lot of good articles which show how to set up Non-Blocking retries using Spring Kafka. One such implementation is given below. This is accomplished using the @RetryableTopic
and @DltHandler
annotations from Spring-Kafka.
Setting up the Retryable Consumer
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
private final CustomEventHandler handler;
@RetryableTopic(attempts = "${retry.attempts}",
backoff = @Backoff(
delayExpression = "${retry.delay}",
multiplierExpression = "${retry.delay.multiplier}"
),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = FAIL_ON_ERROR,
autoStartDltHandler = "true",
autoCreateTopics = "false",
include = {CustomRetryableException.class})
@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("Received event on topic {}", topic);
handler.handleEvent(event);
} catch (Exception e) {
log.error("Error occurred while processing event", e);
throw e;
}
}
@DltHandler
public void listenOnDlt(@Payload CustomEvent event) {
log.error("Received event on dlt.");
handler.handleEventFromDlt(event);
}
}
If you notice in the above code snippet, the include
parameter contains CustomRetryableException.class
. This tells the consumer to retry only in case CustomRetryableException is thrown by the CustomEventHandler#handleEvent
method. You can add as many as you like. There is an exclude parameter as well, but any one of them can be used at a time.
The event processing should be retried for a maximum of ${retry.attempts}
times before publishing to the DLT.
Setting up Test Infra
To write integration tests, you need to make sure that you have a functioning Kafka broker (embedded preferred) and a fully functioning publisher. Let's set up our infrastructure:
@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=" + "${kafka.broker.listeners}",
"port=" + "${kafka.broker.port}"},
controlledShutdown = true,
topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
@Autowired
private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
// tests
}
** Configurations are imported from the application-test.yml file.
When using an embedded kafka broker, it is important to mention the topics to be created. They will not be created automatically. In this case, we are creating four topics, namely
"test", "test-retry-0", "test-retry-1", "test-dlt"
We have set out the maximum retry attempts to three. Each topic corresponds to each of the retry attempts. So events should be forwarded to DLT if three retries are exhausted.
Test Cases
Retry should not be done if consumption is successful on the first attempt.
This can be tested by the fact that the CustomEventHandler#handleEvent
method is called only once. Further tests on Log statements can also be added.
@Test
void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
Retry should not be done if a non-retryable exception is raised.
In this case, the CustomEventHandler#handleEvent
method should be invoked only once:
@Test
void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
Retry for the maximum configured number of times if a RetryableException
is thrown and subsequently should be published to Dead Letter Topic when retries are exhausted.
In this case, the CustomEventHandler#handleEvent
method should be invoked three (maxRetries) times and CustomEventHandler#handleEventFromDlt
method should be invoked once.
@Test
void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));
}
**A considerable timeout has been added in the verification stage so that exponential back-off delay can be taken into consideration before the test is completed. This is important and may result in an assertion failure if not set properly.
Should be retried until RetryableException
is resolved And should not continue retrying if a non-retryable exception is raised or consumption eventually succeeds.
The test has been set up such as to throw a RetryableException
first and then throw a NonRetryable exception
, such that retry is done once.
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
Conclusion
So, you can see that the integration test is a mix and match of strategies, timeouts, delays, and verifications so as to foolproof the retry mechanism of your Kafka Event-Driven Architecture.
Kudos. Feel Free to suggest improvements and reach out to me on LinkedIn.
The full code can be found here.
Published at DZone with permission of Mukut Bhattacharjee. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments