Preventing Data Loss With Kafka Listeners in Spring Boot
In this article, we'll look at how to build Kafka listeners with Spring Boot and how to use Kafka's acknowledgment mechanisms.
Join the DZone community and get the full member experience.
Join For FreeData loss is one of the biggest problems developers face when building distributed systems. Whether due to network issues or code bugs, data loss can have serious consequences for enterprises. In this article, we'll look at how to build Kafka listeners with Spring Boot and how to use Kafka's acknowledgment mechanisms to prevent data loss and ensure the reliability of our systems.
Apache Kafka
Apache Kafka is a distributed message platform used to store and deliver messages. Once a message is written to Kafka, it will be kept there according to a retention policy. The consumer groups mechanism is used to read out messages. The offset for each consumer group is used to understand the stage of message processing and to keep track of the progress of each consumer group in reading messages from a partition. It allows each consumer group to independently read messages from a topic and resume reading from where it left off in case of failures or restarts. In a simplified way, this can be represented as follows:
After successfully processing a message, a consumer sends an acknowledgment to Kafka, and the offset pointer for that consumer group is shifted. As mentioned earlier, other consumer groups store their offset values in the message broker, allowing messages to be read independently.
When we talk about high-reliability systems that must guarantee no data loss, we must consider all possible scenarios. Apache Kafka, by design, already has the features to ensure reliability. We, as consumers of messages, must also provide proper reliability. But what can go wrong?
- The consumer receives the message and crashes before he can process it
- The consumer receives the message, processes it, and then crashes
- Any network problems
This can happen for reasons beyond our control — temporary network unavailability, an incident on the instance, pod eviction in a K8s cluster, etc.
Kafka allows guaranteeing message delivery using the acknowledgment mechanism — at least once delivery. It means that the message will be delivered at least once, but under certain circumstances, it can be delivered several times. All we need to do is to configure Apache Kafka correctly and be able to react to duplicate messages if needed. Let's try to implement this in practice.
Run Apache Kafka
To start the message broker, we also need the zookeeper. The easiest way to do this is with docker-compose. Create the file docker-compose.yml:
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.3
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.3
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Create a new topic:
docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic demo
To produce messages, you can run the command:
docker exec -ti broker \
kafka-console-producer --bootstrap-server broker:9092 \
--topic demo
Each line is a new message. When finished, press Ctrl+C:
>first
>second
>third
>^C%
Messages have been written and will be stored in Apache Kafka.
Spring Boot Application
Create a gradle project and add the necessary dependencies to build.gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '2.7.10'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok:1.18.26'
annotationProcessor 'org.projectlombok:lombok:1.18.26'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testCompileOnly 'org.projectlombok:lombok:1.18.26'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.26'
}
application.yml:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Let's write an event handler:
@Component
@Slf4j
public class DemoListener {
@KafkaListener(topics = "demo", groupId = "demo-group")
void processKafkaEvents(ConsumerRecord<String, String> record) {
log.info("Try to process message");
// Some code
log.info("Processed value: " + record.value());
}
}
Execution Result:
Try to process message
Processed value: first
Try to process message
Processed value: second
Try to process message
Processed value: third
But what if an error happens during message processing? In that case, we need to handle it correctly. If this error is related to an invalid message, we can write to the log or place this message in a separate topic — DLT (dead letter topic) for further parsing of this message. And what if processing implies calling another microservice, but that microservice doesn't answer? In this case, we may need the retry mechanism.
To implement it, we can configure DefaultErrorHandler:
@Configuration
@Slf4j
public class KafkaConfiguration {
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(5000, 3);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.error("Couldn't process message: {}; {}", consumerRecord.value().toString(), exception.toString());
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
}
Here we have specified that in case of an error, we will do retries (maximum three times) at intervals of five seconds. But if we have an NPE, we won't do iterations in that case but just write a message to the log and skip the message.
But if we want more flexibility in error handling, we can do it manually:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
enable.auto.commit: false
listener:
ack-mode: MANUAL
Here we set spring.kafka.consumer.properties.enable.auto.commit=false
(if true, the consumer's offset will be periodically committed in the background. In that case property auto.commit.interval.ms
(default 5000ms will be used) and spring.kafka.listener.ack-mode=MANUAL
, which means we want to control this mechanism ourselves.
Now we can control the sending of the acknowledgment ourselves:
@KafkaListener(topics = "demo", groupId = "demo-group")
void processKafkaEvents(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("Try to process message");
try {
//Some code
log.info("Processed value: " + record.value());
acknowledgment.acknowledge();
} catch (SocketTimeoutException e) {
log.error("Error while processing message. Try again later");
acknowledgment.nack(Duration.ofSeconds(5));
} catch (Exception e) {
log.error("Error while processing message: {}" + record.value());
acknowledgment.acknowledge();
}
}
The Acknowledgment object allows you to explicitly acknowledge or reject (nack) the message. By calling acknowledge()
, you are telling Kafka that the message has been successfully processed and can be committed. By calling nack()
, you are telling Kafka that the message should be re-queued for processing after a specified delay (i.e., in a case when another microservice isn't responding).
Conclusion
Data loss prevention is critical for consumer Kafka applications. In this article, we looked at some best practices for exception handling and data loss prevention with Spring Boot. By following these practices, you can ensure that your application is more resilient to failures and can gracefully recover from errors without data loss. By applying these strategies, you can build a robust and reliable Kafka consumer application.
Opinions expressed by DZone contributors are their own.
Comments