Reactive Kafka With Spring Boot
Learn how to build generic, easily configurable, testable reactive consumers, producers, and DLT with Kotlin, Spring Boot, WebFlux, and Testcontainers.
Join the DZone community and get the full member experience.
Join For FreeEvent-driven architectures are at the core of modern, scalable systems. Reactive Kafka, when combined with Spring Boot and WebFlux, offers a powerful approach to building non-blocking, high-throughput services. In this article, we’ll focus on building generic, easily configurable consumers and producers, managing dead letter topics (DLT), and writing integration tests with Testcontainers.
To demonstrate these principles, I’ve created a microservice, bootiful-reactive-kafka, which processes user-related events from an external system. While this microservice is simple, it illustrates key concepts that you can apply to more complex real-world use cases.
- Spoiler alert: The examples/use cases presented here are designed purely for the sake of demonstrating a basic Reactive Kafka integration with Spring Boot WebFlux that focuses on abstraction, the discussed problems here can be solved in various ways and maybe even in better ways, so don’t spend too much on thinking, “Why?”. By no means is this a deep dive in Apache Kafka or a configuration guide, as there are plenty of reputable resources already written about this.
Here is the source code if you'd like to follow along, along with the docker-compose.yaml
that I will be using:
version: '3'
services:
kafka-cluster:
container_name: kafka-cluster
image: landoop/fast-data-dev
environment:
ADV_HOST: 127.0.0.1
RUNTESTS: 0
ports:
- "2181:2181"
- "3030:3030"
- "8081-8083:8081-8083"
- "9581-9585:9581-9585"
- "9092:9092"
volumes:
# Specify an absolute path mapping
- /
analytics-database:
image: postgres:13.3
container_name: analytics-database
command:
- "postgres"
- "-c"
- "wal_level=logical"
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: 123456
POSTGRES_DB: analytics
ports:
- "5432:5432"
One of the requirements is to store the events for auditing/analytics purposes: I’ll use a PostgreSQL database: analytics-database. For a quick and complete Kafka setup, along with a modern streaming platform I am using landoop/fast-data-dev. It provides an amazing user-friendly UI, Kafka Connect, Schema Registry, and Lenses.io's Stream Reactor with over 25 connectors, offering everything needed for efficient data streaming and development.
What Is Reactive Kafka?
Reactor Kafka is a reactive API for Kafka based on Reactor and the Kafka Producer/Consumer API. Reactor Kafka API enables messages to be published to Kafka and consumed from Kafka using functional APIs with non-blocking back-pressure and very low overheads.
You might wonder, "Why would you choose Reactive API for Kafka in the first place?" Well, I prefer using Reactive Kafka over the standard Apache Kafka client when working with Spring WebFlux, since WebFlux is built around non-blocking I/O and a reactive programming model, using a traditional Kafka client which is blocking, doesn’t fit well with the framework’s principles. With Reactive Kafka, we can fully leverage the reactive streams API, allowing for better scalability and resource management.
Consumers
In my microservice, I want to handle some users' action-related events that are being sent from other monitoring applications for analytics purposes and possibly to act on them. Having said that, here are the events that we are going to receive:
LoggedInEvent
- Topic: login-events
- Key<
String
>:"123456"
- Value<JSON>:
{ "userId": "123456", "timestamp": "2024-10-04T10:15:30.000Z", "ipAddress": "1.1.1.1", "deviceType": "MOBILE", "browser": "Chrome", "loginMethod": "TWO_FACTOR", "sessionId": "abcd-efgh-ijkl" }
LoggedOutEvent
- Topic: logout-events
- Key<
String
>:"123456"
- Value<JSON>:
{ "userId": "123456", "timestamp": "2024-10-04T12:30:45.000Z", "ipAddress": "1.1.1.1", "deviceType": "MOBILE", "sessionId": "abcd-efgh-ijkl", "logoutReason": "USER_INITIATED", "browser": "Chrome" }
SessionHackAttemptEvent
- Topic: session-hack-attempt-events
- Key<
String
>:"123456"
- Value<Avro>:
{"userId" : "123456", "sessionId" : " abcd-efgh-ijkl ", "ipAddress" : "1.1.1.1", "attackMethod" : "MAN_IN_THE_MIDDLE" }
SessionStateUpdateEvent
- Topic: session-state-update-events
- Key<
String
>:"123456"
- Value<
String
>:"ACTIVE"
Let’s pick the easiest one to handle — SessionStateUpdateEvent
— and write the consumer for it. Here are the required dependencies:
implementation("org.springframework.kafka:spring-kafka")
implementation("io.projectreactor.kafka:reactor-kafka:1.3.23")
Scratching the Surface
When it comes to consuming messages in Reactor Kafka, messages are consumed using the reactive receiver KafkaReceiver
. Receiver configuration options can be supplied via the ReceiverOptions
instance during the KafkaReceiver
’s creation. So here is the most simple working consumer for our use case:
@Component
class SessionStateUpdateEventConsumer : CommandLineRunner {
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
override fun run(vararg args: String?) {
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "analytics-group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
)
val receiverOptions: ReceiverOptions<String, String> =
ReceiverOptions.create<String, String>(consumerProps).subscription(setOf("session-state-update-events"))
KafkaReceiver.create(receiverOptions)
.receive()
.concatMap { receiverRecord ->
Mono.just(receiverRecord)
.doOnError { log.error("Encountered [{}] during process of SessionStateUpdate", it.message, it) }
.doOnNext { r -> log.debug("Received {} with value={}", r.key(), r.value()) }
.doFinally { receiverRecord.receiverOffset().acknowledge() }
}.subscribe()
}
}
In this trivial example, we're defining a Kafka consumer using CommandLineRunner
to ensure that the consumer starts as soon as the application starts. This could also be achieved using @PostConstruct
or Spring Boot's @EventListener
.
The Kafka consumer properties are fairly basic, including the Kafka broker (localhost:9092
), the group ID (analytics-group), and key/value deserializers set to handle string-based keys and values. We subscribe to session-state-update-events, which represent the source of events that the consumer will process.
We've used the KafkaReceiver.create(receiverOptions)
method, which offers various ways to process records. In this case, I've opted for .receive()
, but there are alternatives like receiveAutoAck
, receiveBatch
, and receiveAtMostOnce
, but diving into these is beyond the scope of this article. I've chosen receive()
for its flexibility in processing each record and managing acknowledgments explicitly.
Inside the processing chain, we log any errors encountered, log the key and value of the received record, and finally, acknowledge the offset to ensure proper Kafka offset management.
In the end, .subscribe()
is crucial as it triggers the consumption of the events. This call is essential because it's what triggers the entire reactive stream to start processing, nothing happens until the stream is subscribed to. Without .subscribe()
, the operations defined in the chain — such as concatMap
, doOnError
, doOnNext
, and doFinally
— won't be executed.
Now if we are to enter the Kafka container’s bash (docker exec -it kafka-cluster bash
), and produce a message:
echo 'userId456:ACTIVE' | kafka-console-producer --bootstrap-server localhost:9092 --topic session-state-update-events --property key.separator=: --property parse.key=true
We will see the following log message:
DEBUG 33492 --- [analytics-service] [alytics-group-1] [] essionStateUpdateEventConsumer$Companion : Received userId456 with value=ACTIVE
While the current implementation is functional, there are some key improvements we need to address for a more robust solution.
A More Idiomatic Approach
First, while we've established a basic Kafka consumer using KafkaReceiver
, Spring Boot provides a more idiomatic and convenient abstraction: the ReactiveKafkaConsumerTemplate<K, V>
. This template simplifies the integration with Kafka by abstracting much of the boilerplate and providing tighter integration with Spring.
Second, hardcoding consumer properties, as seen in this example, severely limits flexibility. This approach makes the code brittle and harder to maintain. Imagine needing to change the topic or update the bootstrap-servers
. Or you might need to adjust properties based on message volume, consumer performance, or even infrastructure changes. You would have to modify the source code and redeploy the application, which is far from ideal in real-world systems.
So let’s address this:
@Configuration
class KafkaConsumerConfig {
@Bean
fun receiverOptions(props: KafkaProperties, @Value("\${spring.kafka.topic}") topic: String): ReceiverOptions<String, String> =
ReceiverOptions.create<String, String>(props.buildConsumerProperties(null)).subscription(listOf(topic))
@Bean
fun consumerTemplate(options: ReceiverOptions<String, String>) : ReactiveKafkaConsumerTemplate<String, String> =
ReactiveKafkaConsumerTemplate(options)
}
By declaring KafkaConsumerConfig
as a @Configuration
class, we leverage Spring’s dependency injection to manage our Kafka consumer configuration more effectively. The use of KafkaProperties
, a @ConfigurationProperties(prefix = "spring.kafka")
class, allows us to externalize properties in a clean and organized manner within application.yaml
. This externalization promotes better separation of concerns and makes our application more adaptable to configuration changes without requiring code modifications.
kafka:
topic: "session-state-update-events"
bootstrap-servers:
- localhost:9092
consumer:
group-id: analytics-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
auto.offset.reset: latest
max.poll.records: 200
With this setup, our consumer now looks cleaner and more streamlined. Here’s the updated SessionStateUpdateEventConsumer
:
@Component
class SessionStateUpdateEventConsumer(val consumerTemplate: ReactiveKafkaConsumerTemplate<String, String>) : CommandLineRunner {
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
override fun run(vararg args: String?) {
consumerTemplate
.receive()
.concatMap { receiverRecord ->
Mono.just(receiverRecord)
.doOnError { log.error("Encountered [{}] during process of SessionStateUpdate", it.message, it) }
.doOnNext { r -> log.debug("Received {} with value={}", r.key(), r.value()) }
.doFinally { receiverRecord.receiverOffset().acknowledge() }
}.subscribe()
}
}
While the current implementation of our Kafka consumer setup is a significant improvement, it does expose a challenge when it comes to the accommodation of multiple consumers. The existing use of KafkaProperties
binds the application configuration properties to a single consumer instance (private final Consumer consumer = new Consumer();
), which can lead to complications when we want to introduce additional consumers with varying requirements.
Relying on a single @Configuration
file or its beans becomes problematic, too. With each new consumer requiring its own configuration properties, we inevitably introduce code duplication. Declaring separate @ConfigurationProperties
for each consumer, along with multiple configuration classes, multiple beans with different names, or @Qualifier
, which increases maintenance overhead.
Given these limitations, it becomes clear that the current approach, while beneficial in many ways, is not extensible.
Let's Abstract
Let’s try to make our consumer creation more abstract and generic:
I started with the creation of the enum KafkaConsumerName
that will ensure type safety in configuration and will provide more context in the logs.
enum class KafkaConsumerName(val eventType: String) {
DEFAULT("Unknown"), SESSION_STATE_UPDATE(String::class.java.simpleName)
}
Next, I created a @ConfigurationProperties
data class, KafkaConsumerConfigurationProperties
, to handle the binding of consumer-specific properties from application.yaml
. This setup allows each consumer to have its own topic and specific Kafka configurations while ensuring type safety with the KafkaConsumerName
enum.
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
data class KafkaConsumerConfigurationProperties(var consumers: Map<KafkaConsumerName, ConsumerProperties> = EnumMap(KafkaConsumerName::class.java)) {
data class ConsumerProperties(
var topic: String? = null,
var properties: Map<String, String> = HashMap()
)
}
I created a KafkaReceiverOptionsFactory
, which is responsible for building ReceiverOptions
based on the KafkaConsumerName
. This factory takes into account both default and consumer-specific properties, allowing us to maintain flexibility and avoid redundancy in configurations.
@Component
class KafkaReceiverOptionsFactory(val config: KafkaConsumerConfigurationProperties) {
companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java)
}
fun <K, V> createReceiverOptions(kafkaConsumerName: KafkaConsumerName): ReceiverOptions<K, V> {
log.debug("Creating receiver options for Consumer=[{}]", kafkaConsumerName)
val defaultProps = config.consumers[KafkaConsumerName.DEFAULT]
?: throw IllegalStateException("Default consumer configuration not found")
val specificProps = config.consumers[kafkaConsumerName]
?: throw IllegalArgumentException("Consumer configuration not found for: $kafkaConsumerName")
val consumerProperties = KafkaConsumerConfigurationProperties.ConsumerProperties(specificProps.topic, defaultProps.properties + specificProps.properties)
log.debug("Computed consumer properties {} : {}", kafkaConsumerName, consumerProperties)
val options = ReceiverOptions.create<K, V>(consumerProperties.properties)
.subscription(listOf(consumerProperties.topic ?: throw IllegalArgumentException("Missing <topic> field for: $kafkaConsumerName")))
.addAssignListener { log.info("Consumer {} partitions assigned {}", kafkaConsumerName, it) }
.addRevokeListener { log.info("Consumer {} partitions revoked {}", kafkaConsumerName, it) }
return options
}
}
Before going any further, let's review how the properties are structured in the application.yaml
:
kafka:
consumers:
DEFAULT:
properties:
bootstrap.servers: localhost:9092
schema.registry.url: http://localhost:8081
auto.offset.reset: latest
spring.json.use.type.headers: false
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
SESSION-STATE-UPDATE:
topic: session-state-update-events
properties:
client.id: analytics.session-state-update
group.id: analytics.session-state-update.group
In this configuration:
DEFAULT
Consumer: Contains shared properties likebootstrap.servers
,schema.registry.url
, and default deserializers for key and value (StringDeserializer
); It also specifies how offsets are managed, which ensures messages are consumed starting from thelatest
offset.SESSION-STATE-UPDATE
Consumer: Includes a specifictopic
(session-state-update-events) and overrides properties such asclient.id
andgroup.id
, which are unique to this consumer.
I provided some default configuration properties, but you can easily add any additional properties you may need from org.apache.kafka.clients.consumer.ConsumerConfig
to better tune your consumers.
Lastly, I introduced a generic AbstractReactiveKafkaConsumer
class that employs the template method pattern to encapsulate common consumer behavior, while still offering flexibility for customization. This class handles the creation of the ReactiveKafkaConsumerTemplate
using the KafkaReceiverOptionsFactory
, so we don't have to worry about that part in individual consumers.
For my specific use case, this class also provides a retry
mechanism during event consumption, which can easily be customized or removed by overriding the consume method. The only method that every consumer must implement is handle
, which processes the received ReceiverRecord<K, V>
and returns a Mono<Void>
.
abstract class AbstractReactiveKafkaConsumer<K : Any, V>(private val consumerName: KafkaConsumerName) : CommandLineRunner {
companion object {
private const val DEFAULT_RETRY_MAX_ATTEMPTS = 3L
}
val log: Logger = LoggerFactory.getLogger(this::class.java)
@Autowired
private lateinit var kafkaReceiverOptionsFactory: KafkaReceiverOptionsFactory
private val kafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<K, V> by lazy {
ReactiveKafkaConsumerTemplate(kafkaReceiverOptionsFactory.createReceiverOptions<K, V>(consumerName))
}
override fun run(vararg args: String?) {
kafkaConsumerTemplate.receive()
.doOnError { log.error("Encountered [{}] during process of ${consumerName.eventType}", it.message, it) }
.concatMap { consume(it) }
.subscribe()
}
protected open fun consume(record: ReceiverRecord<K, V>): Mono<Void> =
Mono.just(record)
.doOnNext { r -> log.debug("Received {} {} with value={}", consumerName.eventType, r.key(), r.value()) }
.flatMap { handle(it) }
.retryWhen(getRetrySpec(record))
.doOnError { log.error("Encountered [{}] during process of ${consumerName.eventType} {}", it.message, record.key(), it) }
.doFinally { record.receiverOffset().acknowledge() }
.onErrorComplete()
.then()
protected open fun getRetrySpec(record: ConsumerRecord<K, V>): Retry =
Retry.fixedDelay(DEFAULT_RETRY_MAX_ATTEMPTS, Duration.ofSeconds(1))
.doAfterRetry {
log.warn("Retrying #{} processing ${consumerName.eventType} {} due to {}", it.totalRetries(), record.key(), it.failure().message, it.failure())
}
.onRetryExhaustedThrow { _, signal -> signal.failure() }
abstract fun handle(record: ReceiverRecord<K, V>): Mono<Void>
}
So, if we shift our focus to SessionStateUpdateEventConsumer
, it looks like this:
@Component
class SessionStateUpdateEventConsumer(val sessionStateUpdateEventAuditService: SessionStateUpdateEventAuditService) :
AbstractReactiveKafkaConsumer<String, String>(SESSION_STATE_UPDATE) {
override fun handle(record: ReceiverRecord<String, String>): Mono<Void> =
sessionStateUpdateEventAuditService.save(record.key(), record.value())
}
Lovely, isn’t it? By extending AbstractReactiveKafkaConsumer
, it inherits all the common behavior for consuming messages while allowing for a streamlined, specific handling of ReceiverRecord<String, String>
. Also, the handle method directly delegates the work to sessionStateUpdateEventAuditService
that will save in database-processed records instead of just logging.
Now starting up the application, we observe the following log entries:
DEBUG 28368 --- [analytics-service] [ restartedMain] [] .k.KafkaReceiverOptionsFactory$Companion : Computed consumer properties for SESSION_STATE_UPDATE : ConsumerProperties(topic=session-state-update-events, properties={bootstrap.servers=localhost:9092, schema.registry.url=localhost:8081, auto.offset.reset=latest, spring.json.use.type.headers=false, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, client.id=analytics.session-state-update, group.id=analytics.session-state-update.group})
These logs confirm that the SESSION_STATE_UPDATE
consumer has been initialized and its ReceiverOptions
have been successfully constructed. Next, when a message is produced, we see:
DEBUG 28368 --- [analytics-service] [-update.group-1] [] .b.m.k.s.SessionStateUpdateEventConsumer : Received String userId456 with value=INACTIVE
DEBUG 28368 --- [analytics-service] [ctor-tcp-nio-10] [] onStateUpdateEventAuditService$Companion : Audited SessionStateUpdateEventAudit(id=10, userId=userId456, createdAt=2024-10-08T17:19:39.460017100, sessionState=INACTIVE)
This indicates that our record was successfully received and audited, confirming that the system is functioning as intended.
JSON and Avro
Let's shift our focus to the rest of the consumers that are JSON and Avro-based and highlight how much this abstraction simplifies our work. First, we’ll define new consumer declarations in the KafkaConsumerName
enum for the JSON ones:
LOGGED_IN_EVENT(LoggedInEvent::class.java.simpleName),
LOGGED_OUT_EVENT(LoggedOutEvent::class.java.simpleName)
Next, let’s look at the application.yaml
. Pay attention to the overridden value.deserializer
and the newly specified spring.json.value.default.type
for the JSON payloads we will receive:
LOGGED-IN-EVENT:
topic: logged-in-events
properties:
client.id: analytics.logged-in-events
group.id: analytics.logged-in-events.group
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: "inc.evil.bootiful_reactive_kafka.messaging.kafka.log_event.model.LoggedInEvent"
LOGGED-OUT-EVENT:
topic: logged-out-events
properties:
client.id: analytics.logged-out-events
group.id: analytics.logged-out-events.group
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: "inc.evil.bootiful_reactive_kafka.messaging.kafka.log_event.model.LoggedOutEvent"
And the consumers — straightforward implementations of AbstractReactiveKafkaConsumer
, delegating the processing to LogEventService
, which saves the processed records in the database:
@Component
class LoggedInEventConsumer(val logEventService: LogEventService) :
AbstractReactiveKafkaConsumer<String, LoggedInEvent>(LOGGED_IN_EVENT) {
override fun handle(record: ReceiverRecord<String, LoggedInEvent>): Mono<Void> =
logEventService.handle(record.value())
}
@Component
class LoggedOutEventConsumer(val logEventService: LogEventService) :
AbstractReactiveKafkaConsumer<String, LoggedOutEvent>(LOGGED_OUT_EVENT) {
override fun handle(record: ReceiverRecord<String, LoggedOutEvent>): Mono<Void> =
logEventService.handle(record.value())
}
Now starting the application we would notice the following log entries:
DEBUG 31276 --- [analytics-service] [ restartedMain] [] .k.KafkaReceiverOptionsFactory$Companion : Computed consumer properties for LOGGED_IN_EVENT : ConsumerProperties(topic=logged-in-events, properties={bootstrap.servers=localhost:9092, schema.registry.url=localhost:8081, auto.offset.reset=latest, spring.json.use.type.headers=false, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer, spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, client.id=analytics.logged-in-events, group.id=analytics.logged-in-events.group, spring.json.value.default.type=inc.evil.bootiful_reactive_kafka.messaging.kafka.log_event.model.LoggedInEvent})
DEBUG 31276 --- [analytics-service] [ restartedMain] [] .k.KafkaReceiverOptionsFactory$Companion : Computed consumer properties for SESSION_STATE_UPDATE : ConsumerProperties(topic=session-state-update-events, properties={bootstrap.servers=localhost:9092, schema.registry.url=localhost:8081, auto.offset.reset=latest, spring.json.use.type.headers=false, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, client.id=analytics.session-state-update, group.id=analytics.session-state-update.group})
DEBUG 31276 --- [analytics-service] [ restartedMain] [] .k.KafkaReceiverOptionsFactory$Companion : Computed consumer properties for LOGGED_OUT_EVENT : ConsumerProperties(topic=logged-out-events, properties={bootstrap.servers=localhost:9092, schema.registry.url=localhost:8081, auto.offset.reset=latest, spring.json.use.type.headers=false, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer, spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, client.id=analytics.logged-out-events, group.id=analytics.logged-out-events.group, spring.json.value.default.type=inc.evil.bootiful_reactive_kafka.messaging.kafka.log_event.model.LoggedOutEvent})
And producing these messages to the corresponding topics:
echo 'userId456:{"userId":"userId456","timestamp":"2024-10-08T12:45:00.123Z","ipAddress":"1.1.1.1","deviceType":"DESKTOP","browser":"Firefox","loginMethod":"PASSWORD","sessionId":"sessionId456"}' \
| kafka-console-producer --bootstrap-server localhost:9092 --topic logged-in-events --property key.separator=: --property parse.key=true
echo 'userId456:{"userId":"userId456","timestamp":"2024-10-08T17:00:00.456Z","ipAddress":"1.1.1.1","deviceType":"DESKTOP","sessionId":"sessionId456","logoutReason":"USER_INITIATED","browser":"Firefox"}' \
| kafka-console-producer --bootstrap-server localhost:9092 --topic logged-out-events --property key.separator=: --property parse.key=true
Yields the following log lines:
DEBUG 31276 --- [analytics-service] [-events.group-1] [] i.e.b.m.k.l.LoggedInEventConsumer : Received LoggedInEvent userId456 with value=LoggedInEvent(userId=userId456, timestamp=2024-10-08T12:45:00.123Z, ipAddress=1.1.1.1, deviceType=DESKTOP, browser=Firefox, loginMethod=PASSWORD, sessionId=sessionId456)
DEBUG 31276 --- [analytics-service] [ctor-tcp-nio-10] [] i.e.b.s.LogEventService$Companion : Audited LogEventAudit(id=6, userId=userId456, createdAt=2024-10-08T17:30:26.610630600, ipAddress=1.1.1.1, deviceType=DESKTOP, browser=Firefox, eventType=LOGIN, loginMethod=PASSWORD, logoutReason=null, sessionId=sessionId456)
DEBUG 31276 --- [analytics-service] [-events.group-3] [] i.e.b.m.k.l.LoggedOutEventConsumer : Received LoggedOutEvent userId456 with value=LoggedOutEvent(userId=userId456, timestamp=2024-10-08T17:00:00.456Z, ipAddress=1.1.1.1, deviceType=DESKTOP, sessionId=sessionId456, logoutReason=USER_INITIATED, browser=Firefox)
DEBUG 31276 --- [analytics-service] [ctor-tcp-nio-10] [] i.e.b.s.LogEventService$Companion : Audited LogEventAudit(id=5, userId=userId456, createdAt=2024-10-08T17:30:21.678101900, ipAddress=1.1.1.1, deviceType=DESKTOP, browser=Firefox, eventType=LOGOUT, loginMethod=null, logoutReason=USER_INITIATED, sessionId=sessionId456)
This demonstrates that our new consumers are functioning correctly and processing events as expected.
Lastly, let's implement the Avro consumer for auditing session hack attempts. First, we need the following dependencies:
repositories {
maven {
url = uri("https://packages.confluent.io/maven/")
}
}
implementation("io.confluent:kafka-avro-serializer:7.7.1")
implementation("io.confluent:kafka-streams-avro-serde:7.7.1")
We need this plugin and its configuration so we can generate Avro models and include them in the source sets:
id("com.github.davidmc24.gradle.plugin.avro") version "1.7.1"
Here is the Avro schema for the previously mentioned payload placed in resources/avro
:
{
"type": "record",
"name": "SessionHackAttemptEvent",
"namespace": "inc.evil.bootiful_reactive_kafka.messaging.kafka.consumer.session_state.model",
"fields": [
{
"name": "userId",
"type": "string"
},
{
"name": "sessionId",
"type": "string"
},
{
"name": "ipAddress",
"type": "string"
},
{
"name": "attackMethod",
"type": {
"type": "enum",
"name": "AttackMethod",
"symbols": [
"SESSION_HIJACKING",
"BRUTE_FORCE",
"SESSION_FIXATION",
"TOKEN_THEFT",
"MAN_IN_THE_MIDDLE",
"CROSS_SITE_SCRIPTING",
"OTHER"
]
}
}
]
}
Let's add the new KafkaConsumerName
. Before that, run generateAvroJava
so we can bind the eventType
to the generated Avro model:
SESSION_HACK_ATTEMPT(SessionHackAttemptEvent::class.java.simpleName)
Now in the application.yaml
, notice the io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
that overrides default value.deserializer
:
SESSION-HACK-ATTEMPT:
topic: session-hack-attempt-events
properties:
client.id: analytics.session-hack-attempt
group.id: analytics.session-hack-attempt.group
value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
Next, the consumer implementation:
@Component
class SessionHackAttemptEventConsumer(val sessionStateUpdateEventAuditService: SessionStateUpdateEventAuditService) :
AbstractReactiveKafkaConsumer<String, SessionHackAttemptEvent>(SESSION_HACK_ATTEMPT) {
override fun handle(record: ReceiverRecord<String, SessionHackAttemptEvent>): Mono<Void> =
sessionStateUpdateEventAuditService.audit(record.key(), record.value())
}
Finally, when starting the application, you will notice the following log entries:
DEBUG 37512 --- [analytics-service] [ main] [] .c.KafkaReceiverOptionsFactory$Companion : Computed consumer properties for SESSION_HACK_ATTEMPT : ConsumerProperties(topic=session-hack-attempt-events, properties={bootstrap.servers=localhost:9092, schema.registry.url=http://localhost:8081, auto.offset.reset=latest, spring.json.use.type.headers=false, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer, spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer, client.id=analytics.session-hack-attempt, group.id=analytics.session-hack-attempt.group})
When producing a message, we can confirm that this consumer too is functioning correctly:
DEBUG 37512 --- [analytics-service] [attempt.group-4] [] .m.k.c.s.SessionHackAttemptEventConsumer : Received SessionHackAttemptEvent with key=codeMasterX and value={"userId": "codeMasterX", "sessionId": "123456", "ipAddress": "1.1.1.1", "attackMethod": "MAN_IN_THE_MIDDLE"}
DEBUG 37512 --- [analytics-service] [ctor-tcp-nio-10] [] onStateUpdateEventAuditService$Companion : Audited SessionStateUpdateEventAudit(id=12, userId=codeMasterX, createdAt=2024-10-11T19:02:37.099877200, sessionState=HACK_ATTEMPT)
Testing Time
Now that our consumers are properly configured, it's time to incorporate integration tests. For this, I will use Testcontainers. Below are the necessary dependencies for the integration testing:
testImplementation("org.testcontainers:kafka")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.testcontainers:postgresql")
testImplementation("org.testcontainers:r2dbc")
testImplementation("org.awaitility:awaitility")
If you're interested in setting up integration tests for Spring Data R2DBC, you can check out my other article on DZone, "Testcontainers With Kotlin and Spring Data R2DBC," which provides guidance on configuring most of the code required for the database layer that I am going to use here.
Basically, I set up an AbstractTestcontainersTest
class to handle test container management for PostgreSQL and Kafka. Both containers are initialized as singletons to ensure they are reused across tests, with the JVM handling their shutdown. Connection properties for both datasource and Kafka are dynamically overridden to point to the containers.
@Tag("integration-test")
abstract class AbstractTestcontainersTest {
@Autowired
lateinit var connectionFactory: ConnectionFactory
fun executeScriptBlocking(sqlScript: String) {
Mono.from(connectionFactory.create())
.flatMap<Any> { connection -> ScriptUtils.executeSqlScript(connection, ClassPathResource(sqlScript)) }.block()
}
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
private val postgres: PostgreSQLContainer<*> = PostgreSQLContainer(DockerImageName.parse("postgres:13.3"))
.apply {
this.withDatabaseName("testDb")
.withUsername("root")
.withPassword("123456")
.withReuse(true)
}
private val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1"))
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.r2dbc.url", Companion::r2dbcUrl)
registry.add("spring.r2dbc.username", postgres::getUsername)
registry.add("spring.r2dbc.password", postgres::getPassword)
registry.add("spring.flyway.url", postgres::getJdbcUrl)
registry.add("spring.kafka.bootstrap-servers") { kafka.bootstrapServers }
registry.add("spring.kafka.consumers.DEFAULT.properties.bootstrap.servers") { kafka.bootstrapServers }
registry.add("spring.kafka.consumers.DEFAULT.properties.auto.offset.reset") { "earliest" }
}
private fun r2dbcUrl(): String {
return "r2dbc:postgresql://${postgres.host}:${postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)}/${postgres.databaseName}"
}
@JvmStatic
@BeforeAll
internal fun setUp(): Unit {
postgres.start()
log.info("Testcontainers -> PostgresSQL DB started on [${r2dbcUrl()}] with user:root and password:123456")
kafka.start()
log.info("Testcontainers -> Kafka started with bootstrap.servers=${kafka.bootstrapServers}")
}
}
}
Subsequently, integration tests (e.g., for SessionStateUpdateEventConsumer
) inherit from this abstract class, ensuring seamless setup with the Testcontainers.
@ComponentTest
@ExtendWith(OutputCaptureExtension::class)
class SessionStateUpdateEventConsumerIntegrationTest : AbstractTestcontainersTest() {
@SpyBean
private lateinit var sessionStateUpdateEventAuditService: SessionStateUpdateEventAuditService
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, Any>
companion object {
private val topicName = "test-topic-${UUID.randomUUID()}"
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.consumers.SESSION-STATE-UPDATE.topic") { topicName }
}
}
@Test
@RunSql(["/db-data/session-state-update-events.sql"])
fun consume_withValidSessionStateUpdate_savesSessionStateInDatabase() {
val userId = "user_987"
val sessionState = "ACTIVE"
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
kafkaTemplate.send(topicName, userId, sessionState).get()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(30, TimeUnit.SECONDS)
.untilAsserted {
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(2)
verify(sessionStateUpdateEventAuditService).audit(userId, sessionState)
}
}
@Test
@RunSql(["/db-data/session-state-update-events.sql"])
fun consume_withErrorProcessing_retriesAndSavesSessionStateInDatabase(logs: CapturedOutput) {
val userId = "user_987"
val sessionState = "ACTIVE"
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
doAnswer { throw RuntimeException("Oops, something happened!") }
.doAnswer { throw RuntimeException("Again? Again! Something happened!") }
.doCallRealMethod()
.whenever(sessionStateUpdateEventAuditService).audit(userId, sessionState)
kafkaTemplate.send(topicName, userId, sessionState).get()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(30, TimeUnit.SECONDS)
.untilAsserted {
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(2)
assertThat(logs.out).contains("Retrying #0 processing String user_987")
assertThat(logs.out).contains("Retrying #1 processing String user_987")
verify(sessionStateUpdateEventAuditService, times(3)).audit(userId, sessionState)
}
}
@AfterEach
fun reset() {
LogManager.getLogManager().readConfiguration()
}
}
I inject KafkaTemplate
for interacting with Kafka, while using a spy on SessionStateUpdateEventAuditService
to simulate service errors. The topicName
the consumer subscribes to is overridden with a random value in the test because the containers are used in a singleton manner, which leads to a shared state. This ensures isolation and prevents unintended side effects, maintaining the test's integrity. I test two flows:
- Auditing the processed
SessionStateUpdateEvent
(happy path) - Simulating 2 consecutive exceptions to test the
retry
mechanism
So the flows go like this:
- Insert events into the database and verify the number of events for a specific
userId
. - Simulate a produced message with
KafkaTemplate
. - Use
Awaitility
to poll every 3 seconds (for up to 30 seconds) to check if the event count for the user increased and verify the service method was invoked. - For the
retry
mechanism, simulate 2 consecutive exceptions, then let the flow proceed. Verify the service was called 3 times and checkCapturedOutput
for log lines confirming the retries (e.g.,"Retrying #0 processing String user_987"
).
For JSON consumers, the configuration for integration testing is mostly the same. However, for the Avro consumer, we need to accommodate a new component: the Schema Registry. This is necessary so that both the SessionHackAttemptEventConsumer
and the testing Kafka producer can access the Avro schema.
Let's start by adjusting AbstractTestcontainersTest
to incorporate the Schema Registry:
@Tag("integration-test")
abstract class AbstractTestcontainersTest {
@Autowired
lateinit var connectionFactory: ConnectionFactory
fun executeScriptBlocking(sqlScript: String) {
Mono.from(connectionFactory.create())
.flatMap<Any> { connection -> ScriptUtils.executeSqlScript(connection, ClassPathResource(sqlScript)) }.block()
}
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
private val postgres: PostgreSQLContainer<*> = PostgreSQLContainer(DockerImageName.parse("postgres:13.3"))
.apply {
this.withDatabaseName("testDb")
.withUsername("root")
.withPassword("123456")
.withReuse(true)
}
private val network: Network = Network.newNetwork()
private val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1")).withNetwork(network)
private val schemaRegistry: GenericContainer<*> = GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.5.2"))
.withNetwork(network)
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://${kafka.networkAliases[0]}:9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.r2dbc.url", Companion::r2dbcUrl)
registry.add("spring.r2dbc.username", postgres::getUsername)
registry.add("spring.r2dbc.password", postgres::getPassword)
registry.add("spring.flyway.url", postgres::getJdbcUrl)
registry.add("spring.kafka.bootstrap-servers") { bootstrapServers() }
registry.add("spring.kafka.consumers.DEFAULT.properties.bootstrap.servers") { kafka.bootstrapServers }
registry.add("spring.kafka.consumers.DEFAULT.properties.schema.registry.url") { schemaRegistryUrl() }
registry.add("spring.kafka.consumers.DEFAULT.properties.auto.offset.reset") { "earliest" }
}
fun schemaRegistryUrl() = "http://${schemaRegistry.host}:${schemaRegistry.firstMappedPort}"
fun bootstrapServers() = kafka.bootstrapServers
private fun r2dbcUrl(): String = "r2dbc:postgresql://${postgres.host}:${postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)}/${postgres.databaseName}"
@JvmStatic
@BeforeAll
internal fun setUp(): Unit {
postgres.start()
log.info("Testcontainers -> PostgresSQL DB started on [${r2dbcUrl()}] with user:root and password:123456")
kafka.start()
log.info("Testcontainers -> Kafka started with bootstrap.servers=${kafka.bootstrapServers}")
schemaRegistry.start()
log.info("Testcontainers -> Kafka Schema Registry started with url=${schemaRegistryUrl()}")
}
}
}
Instead of having only the Kafka container, we now add the Schema Registry container, too, and connect them using a new Network. We also need to ensure that the schema.registry.url
is overridden accordingly and then the container starts.
Finally, the test:
@ComponentTest
@ExtendWith(OutputCaptureExtension::class)
class SessionHackAttemptEventConsumerIntegrationTest : AbstractTestcontainersTest() {
@SpyBean
private lateinit var sessionStateUpdateEventAuditService: SessionStateUpdateEventAuditService
companion object {
private val topicName = "test-topic-${UUID.randomUUID()}"
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.consumers.SESSION-HACK-ATTEMPT.topic") { topicName }
}
}
@Test
@RunSql(["/db-data/session-state-update-events.sql"])
fun consume_withValidSessionHackAttemptEvent_savesSessionStateInDatabase() {
val userId = "user_987"
val sessionHackAttemptEvent = SessionHackAttemptEventFixture.of()
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
getKafkaProducer().send(ProducerRecord(topicName, userId, sessionHackAttemptEvent)).get()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(30, TimeUnit.SECONDS)
.untilAsserted {
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(2)
verify(sessionStateUpdateEventAuditService).audit(userId, sessionHackAttemptEvent)
}
}
@Test
@RunSql(["/db-data/session-state-update-events.sql"])
fun consume_withErrorProcessing_retriesAndSavesSessionStateInDatabase(logs: CapturedOutput) {
val userId = "user_987"
val sessionHackAttemptEvent = SessionHackAttemptEventFixture.of()
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
doAnswer { throw RuntimeException("Oops, something happened!") }
.doAnswer { throw RuntimeException("Again? Again! Something happened!") }
.doCallRealMethod()
.whenever(sessionStateUpdateEventAuditService).audit(eq(userId), anyOrNull<SessionHackAttemptEvent>())
getKafkaProducer().send(ProducerRecord(topicName, userId, sessionHackAttemptEvent)).get()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(30, TimeUnit.SECONDS)
.untilAsserted {
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(2)
assertThat(logs.out).contains("Retrying #0 processing SessionHackAttemptEvent user_987")
assertThat(logs.out).contains("Retrying #1 processing SessionHackAttemptEvent user_987")
verify(sessionStateUpdateEventAuditService, times(3)).audit(userId, sessionHackAttemptEvent)
}
}
private fun getKafkaProducer(): KafkaProducer<String, SessionHackAttemptEvent> {
val properties = mapOf(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers())
val valueSerializer = SpecificAvroSerializer<SessionHackAttemptEvent>()
valueSerializer.configure(mapOf(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl()), false)
return KafkaProducer(properties, StringSerializer(), valueSerializer)
}
@AfterEach
fun reset() {
LogManager.getLogManager().readConfiguration()
}
}
Instead of using kafkaTemplate
, which is sufficient for simple scenarios, we use getKafkaProducer()
. This method constructs a KafkaProducer
with a custom Avro valueSerializer
that is configured to connect to the Schema Registry container.
Producers
Now that the microservice handles LoggedInEvent
and LoggedOutEvent
properly and audits them for analytics purposes, there is a new feature request: send a Kafka message to initialize sandbox-resources on login event and send a Kafka message to destroy sandbox-resources on a logged out event. So we’ll have this:
On LoggedInEvent
- Topic: init-sandbox-resources
- Key<
String
>:"123456"
- Value<
String
>:"INIT"
On LoggedOutEvent
- Topic: destroy-sandbox-resources
- Key<
String
>: "123456" - Value<JSON>:
{ "userId": "123456", "resources": [ "CONTAINER", "DATABASE", "STORAGE_VOLUME" ] }
Let’s Abstract
When it comes to publishing messages in Reactor Kafka, messages are sent using the reactive sender KafkaSender
. Sender configuration options can be supplied via the SenderOptions
instance during the KafkaSender
’s creation. To create a KafkaSender
you can use KafkaSender.create(senderOptions)
method, which provides different ways to send records like send()
or sendTransactionally()
. And similar to consumers, Spring offers a more idiomatic approach to using KafkaSender
through its abstraction called ReactiveKafkaProducerTemplate
.
So let’s start with the init-sandbox-resources producer.
Following the same approach used for consumer configuration, I created KafkaProducerName
to enforce type-safety in the application.yaml
configuration:
enum class KafkaProducerName(val eventType: String) {
DEFAULT("Unknown"),
INIT_RESOURCES(String::class.java.simpleName)
}
Next, I refactored the KafkaConfigurationProperties
to also include producer properties:
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
data class KafkaConfigurationProperties(
var consumers: Map<KafkaConsumerName, ConsumerProperties> = EnumMap(KafkaConsumerName::class.java),
var producers: Map<KafkaProducerName, ProducerProperties> = EnumMap(KafkaProducerName::class.java)
) {
data class ConsumerProperties(
var topic: String? = null,
var properties: Map<String, String> = HashMap()
)
data class ProducerProperties(
var topic: String? = null,
var properties: Map<String, String> = HashMap()
)
}
Next, I created KafkaSenderOptionsFactory
, which is responsible for constructing SenderOptions
based on the settings in application.yaml
. It also provides a way to retrieve the topic for a specific producer via getSenderTopic()
.
@Component
class KafkaSenderOptionsFactory(val config: KafkaConfigurationProperties) {
companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java)
}
fun <K, V> createSenderOptions(kafkaProducerName: KafkaProducerName): SenderOptions<K, V> {
log.debug("Creating receiver options for Producer=[{}]", kafkaProducerName)
val defaultProps = config.producers[KafkaProducerName.DEFAULT]
?: throw IllegalStateException("Default producer configuration not found")
val specificProps = config.producers[kafkaProducerName]
?: throw IllegalArgumentException("Producer configuration not found for: $kafkaProducerName")
val producerProperties = KafkaConfigurationProperties.ProducerProperties(specificProps.topic, defaultProps.properties + specificProps.properties)
log.debug("Computed producer properties for {} : {}", kafkaProducerName, producerProperties)
return SenderOptions.create(producerProperties.properties)
}
fun getSenderTopic(kafkaProducerName: KafkaProducerName) =
(config.producers[kafkaProducerName]?.topic ?: throw IllegalArgumentException("Producer topic not found for: $kafkaProducerName"))
}
Next, let’s examine the structure of the application.yaml
configuration for Kafka producers which follows the same structure to consumers:
kafka:
producers:
DEFAULT:
properties:
bootstrap.servers: localhost:9092
schema.registry.url: http://localhost:8081
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
INIT_RESOURCES:
topic: init-sandbox-resources
properties:
client.id: analytics.init-sandbox-resources
retries: 3
compression.type: snappy
In this configuration:
DEFAULT
Producer contains common properties shared across all producers, including thebootstrap.servers
,schema.registry.url
, and default serializers for keys and values. These settings are used as a fallback for other producers unless explicitly overridden.INIT_RESOURCES
Producer contains additional specific configurations. For example here, these include theclient.id
for identifying the producer instance, some retry settings, and a specifiedcompression.type
. Thetopic
field specifies the Kafka topic where this producer will send its messages.
And finally, the pièce de résistance: the AbstractReactiveKafkaProducer
class, designed to abstract the creation and use of a reactive Kafka producer:
abstract class AbstractReactiveKafkaProducer<K : Any, V : Any>(private val producerName: KafkaProducerName) {
val log: Logger = LoggerFactory.getLogger(this::class.java)
@Autowired
private lateinit var kafkaSenderOptionsFactory: KafkaSenderOptionsFactory
private val kafkaProducerTemplate: ReactiveKafkaProducerTemplate<K, V> by lazy {
ReactiveKafkaProducerTemplate(kafkaSenderOptionsFactory.createSenderOptions<K, V>(producerName))
}
open fun send(key: K, value: V, headers: Iterable<Header> = mutableSetOf()): Mono<Void> =
Mono.just(ProducerRecord(kafkaSenderOptionsFactory.getSenderTopic(producerName), null, key, value, headers))
.flatMap { record ->
kafkaProducerTemplate.send(record)
.doOnSuccess { log.debug("{} published message with key={} and value={}", producerName, key, value) }
.doOnError { log.error("Encountered [{}] during sending of ${producerName.eventType}", it.message, it) }
.then()
}
}
Let's implement InitResourcesMessageProducer
, which currently looks like this. Note that the send()
method can be overridden to satisfy different needs, but at the moment, the default implementation just satisfies our needs.
@Component
class InitResourcesMessageProducer : AbstractReactiveKafkaProducer<String, String>(KafkaProducerName.INIT_RESOURCES)
We can inject this producer into LogEventService
and trigger it to send the INIT
message upon receiving a LoggedInEvent
like this:
fun handle(event: LoggedInEvent): Mono<Void> =
repository.save(event.toAuditEntity())
.doOnNext { log.debug("Audited {}", it) }
.doOnSuccess {
initResourcesMessageProducer.send(event.userId, INIT.name, mutableSetOf(RecordHeader("ipAddress", event.ipAddress.toByteArray())))
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
}
.then()
When we start the application and produce a message on the logged-in-events topic, we can observe the following log entries:
DEBUG 3760 --- [analytics-service] [oundedElastic-1] [] .k.p.KafkaSenderOptionsFactory$Companion : Computed producer properties for INIT_RESOURCES : ProducerProperties(topic=init-sandbox-resources, properties={bootstrap.servers=localhost:9092, schema.registry.url=http://localhost:8081, key.serializer=org.apache.kafka.common.serialization.StringSerializer, value.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=analytics.init-sandbox-resources, retries=3, compression.type=snappy})
DEBUG 3760 --- [analytics-service] [ndbox-resources] [] e.b.m.k.p.i.InitResourcesMessageProducer : INIT_RESOURCES published message with key=userId456 and value=INIT
This proves that the producer is functioning as expected.
Now let's shift our focus to the JSON-based producer and implement it. First, we define a new producer declaration in the KafkaProducerName
enum:
DESTROY_RESOURCES(DestroyResourcesMessage::class.java.simpleName)
Next, we configure the application.yaml
. Notice the overridden value.serializer
set to org.springframework.kafka.support.serializer.JsonSerializer
:
DESTROY_RESOURCES:
topic: destroy-sandbox-resources
properties:
client.id: analytics.destroy-sandbox-resources
compression.type: snappy
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
Next, we declare the producer bean:
@Component
class DestroyResourcesMessageProducer : AbstractReactiveKafkaProducer<String, DestroyResourcesMessage>(DESTROY_RESOURCES)
Finally, the last step is to configure the producer to send the DestroyResourcesMessage
upon receiving a LoggedOutEvent
, like this:
fun handle(event: LoggedOutEvent): Mono<Void> =
repository.save(event.toAuditEntity())
.doOnNext { log.debug("Audited {}", it) }
.doOnSuccess {
destroyResourcesMessageProducer.send(event.userId, DestroyResourcesMessage.from(event))
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
}
.then()
Once again, our abstraction simplifies much of the logic while remaining highly customizable and flexible per use case. Now, when we start the application and produce a message on the logged-out-events topic, we notice the following log lines:
DEBUG 38756 --- [analytics-service] [oundedElastic-1] [] .k.p.KafkaSenderOptionsFactory$Companion : Computed producer properties for DESTROY_RESOURCES : ProducerProperties(topic=destroy-sandbox-resources, properties={bootstrap.servers=localhost:9092, schema.registry.url=http://localhost:8081, key.serializer=org.apache.kafka.common.serialization.StringSerializer, value.serializer=org.springframework.kafka.support.serializer.JsonSerializer, client.id=analytics.destroy-sandbox-resources, compression.type=snappy})
DEBUG 38756 --- [analytics-service] [ndbox-resources] [] .m.k.p.d.DestroyResourcesMessageProducer : DESTROY_RESOURCES published message with key=userId456 and value=DestroyResourcesMessage(userId=userId456, resources=[CONTAINER])
Testing Time
Now that our producers are properly configured, it's time to write some integration tests. The approach will be similar to what we implemented for the consumers, with only slight differences.
Let’s see what we have for the InitResourcesMessageProducer
:
@ComponentTest
class InitResourcesMessageProducerTest : AbstractTestcontainersTest() {
@Autowired
private lateinit var initResourcesMessageProducer: InitResourcesMessageProducer
companion object {
private val topicName = "test-topic-${UUID.randomUUID()}"
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.producers.INIT_RESOURCES.topic") { topicName }
}
}
@Test
fun send_correctlyPublishesMessage() {
val consumerProperties =
mapOf(BOOTSTRAP_SERVERS_CONFIG to bootstrapServers(), GROUP_ID_CONFIG to "init-test-consumer", AUTO_OFFSET_RESET_CONFIG to "earliest")
val consumer = KafkaConsumer(consumerProperties, StringDeserializer(), StringDeserializer())
consumer.subscribe(listOf(topicName))
val key = "userId"
val value = InitCommandType.INIT.name
initResourcesMessageProducer.send(key, value).block()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(20, TimeUnit.SECONDS)
.untilAsserted {
val records = consumer.poll(Duration.ofMillis(100))
assertThat(records).isNotEmpty.hasSize(1)
assertThat(records.first().key()).isEqualTo(key)
assertThat(records.first().value()).isEqualTo(value)
}
consumer.unsubscribe()
}
}
In the InitResourcesMessageProducerTest
class, we inject the InitResourcesMessageProducer
and generate a unique topic
name, similar to the approach used in the consumer tests. A dynamic consumer is created that subscribes to the topic where the producer will send messages. After sending the message, we use Awaitility to poll the topic for the newly produced message and assert the key and value. The same approach is applied to the DestroyResourcesMessageProducer
.
@ComponentTest
class DestroyResourcesMessageProducerTest : AbstractTestcontainersTest() {
@Autowired
private lateinit var destroyResourcesMessageProducer: DestroyResourcesMessageProducer
companion object {
private val topicName = "test-topic-${UUID.randomUUID()}"
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.producers.DESTROY_RESOURCES.topic") { topicName }
}
}
@Test
fun send_correctlyPublishesMessage() {
val consumerProperties =
mapOf(BOOTSTRAP_SERVERS_CONFIG to bootstrapServers(), GROUP_ID_CONFIG to "destroy-test-consumer", AUTO_OFFSET_RESET_CONFIG to "earliest")
val consumer = KafkaConsumer(consumerProperties, StringDeserializer(), StringDeserializer())
consumer.subscribe(listOf(topicName))
val key = "userId"
val value = DestroyResourcesMessage(key, listOf(ResourceType.CONTAINER, ResourceType.DATABASE))
//language=json
val jsonPayload = """{"userId":"userId","resources":["CONTAINER","DATABASE"]}"""
destroyResourcesMessageProducer.send(key, value).block()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(20, TimeUnit.SECONDS)
.untilAsserted {
val records = consumer.poll(Duration.ofMillis(100))
assertThat(records).isNotEmpty.hasSize(1)
assertThat(records.first().key()).isEqualTo(key)
assertThat(records.first().value()).isEqualTo(jsonPayload)
}
consumer.unsubscribe()
}
}
Dead-Letter Topic Producer
Now, with the producer abstractions in place, we can take the next step: introducing a dead-letter topic (DLT) producer to enrich consumer behavior.
A dead-letter topic serves as a safety net, ensuring that messages that fail to process can be redirected and revisited later. In distributed systems, consumers can encounter transient issues, such as database constraints, timeouts, etc. Instead of dropping these failed messages, we route them to a DLT, preserving their context for possible retry attempts or in our case, further analysis/debugging.
With the producer and consumer logic already abstracted, integrating a primitive DLT producer becomes seamless. Let's explore how to achieve this for SessionStateUpdateEventConsumer
.
We’ll start by ensuring that this behavior is optional and configurable via application.yaml
by introducing a new property in ConsumerProperties
named dltEnabled
which defaults to false
.
data class ConsumerProperties(
var topic: String? = null,
var dltEnabled: Boolean = false,
var properties: Map<String, String> = HashMap()
)
We'll enable this property only for the SESSION-STATE-UPDATE
consumer:
SESSION-STATE-UPDATE:
topic: session-state-update-events
dlt-enabled: true
properties:
client.id: analytics.session-state-update
group.id: analytics.session-state-update.group
In KafkaReceiverOptionsFactory
we need a new method that will tell if DLT is enabled:
fun isDeadLetterTopicEnabled(kafkaConsumerName: KafkaConsumerName) = (config.consumers[kafkaConsumerName]?.dltEnabled) ?: false
Then we are going to reuse the existing DEFAULT
producer properties to create DLT-specific SenderOptions
. This will serialize the ReceiveRecord
using org.apache.kafka.common.serialization.StringSerializer
. This can be achieved by creating a new method in KafkaSenderOptionsFactory
like so:
fun <K, V> createDltSenderOptions(): SenderOptions<K, V> {
val defaultProps = config.producers[KafkaProducerName.DEFAULT]
?: throw IllegalStateException("Default producer configuration not found")
log.debug("Computed DLT Producer properties for {}", defaultProps)
return SenderOptions.create(defaultProps.properties)
}
Now, we can implement the ReactiveKafkaDeadLetterTopicProducer
as follows:
@Component
class ReactiveKafkaDeadLetterTopicProducer(kafkaSenderOptionsFactory: KafkaSenderOptionsFactory) {
companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java)
}
private val kafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, String> by lazy {
ReactiveKafkaProducerTemplate(kafkaSenderOptionsFactory.createDltSenderOptions<String, String>())
}
fun <K,V> process(record: ReceiverRecord<K, V>): Mono<SenderResult<String>> =
kafkaProducerTemplate.send(Mono.just<SenderRecord<String, String, String>>(record.asSenderRecord()))
.doOnNext { log.info("Dead-letter topic correlationMetadata: {}", it.correlationMetadata()) }
.next()
.doOnSuccess { log.debug("Published message with key={} and value={} to dead-letter topic", record.key(), record.value()) }
.doOnError { log.error("Error publishing message with key={} and value={} to dead-letter topic", record.key(), record.value(), it) }
private fun <K,V> ReceiverRecord<K, V>.asSenderRecord(): SenderRecord<String, String, String> {
val deadLetterTopic = "${topic()}-dlt"
val producerRecord: ProducerRecord<String, String> = ProducerRecord<String, String>(deadLetterTopic, key().toString(), value().toString())
log.debug("Computed producer record with key={} and value={} and dead-letter topic={}", key(), value(), deadLetterTopic)
return SenderRecord.create(producerRecord, producerRecord.key())
}
}
In this class, we initialize a ReactiveKafkaProducerTemplate
using DLT-specific SenderOptions
generated by KafkaSenderOptionsFactory
. The process method handles sending messages to the dead-letter topic along with logging the results. It also converts a ReceiverRecord
into a SenderRecord
by computing the dead-letter topic name and creating a corresponding ProducerRecord
, ensuring both the key and value are serialized as String
for the DLT.
Finally, all that's left is to integrate the new DLT producer into AbstractReactiveKafkaConsumer
as follows:
@Autowired
private lateinit var reactiveKafkaDeadLetterTopicProducer: ReactiveKafkaDeadLetterTopicProducer
After some refactoring in the consume
method, we end up with this:
protected open fun consume(record: ReceiverRecord<K, V>): Mono<Void> =
Mono.just(record)
.doOnNext { r -> log.debug("Received {} with key={} and value={}", consumerName.eventType, r.key(), r.value()) }
.flatMap { handle(it) }
.retryWhen(getRetrySpec(record))
.onErrorResume { handleProcessingError(record, it) }
.doFinally { record.receiverOffset().acknowledge() }
.onErrorComplete()
.then()
protected open fun handleProcessingError(record: ReceiverRecord<K, V>, ex: Throwable): Mono<Void> =
(Mono.error<Void>(ex)
.takeUnless { kafkaReceiverOptionsFactory.isDeadLetterTopicEnabled(consumerName) }
?: reactiveKafkaDeadLetterTopicProducer.process(record).then(Mono.error(ex)))
.doFirst { log.error("Encountered [{}] during process of ${consumerName.eventType} {}", ex.message, record.key(), ex) }
The handleProcessingError
method now checks if dead-letter topics are enabled for the consumer. If they are, it publishes the record to the consumer’s specific DLT using the ReactiveKafkaDeadLetterTopicProducer
, otherwise, it simply logs the error and rethrows the exception.
That’s it. At this point, if any exceptions occur during message consumption in a consumer with dltEnabled
set to true
, messages will be retried and eventually published to the DLT, which in this case is session-state-update-events-dlt.
Here’s how we might test this:
@Test
@RunSql(["/db-data/session-state-update-events.sql"])
fun consume_withErrorProcessingAndFailedRetries_publishesToDeadLetterTopic(logs: CapturedOutput) {
val userId = "user_987"
val sessionState = "ACTIVE"
val consumerProperties =
mapOf(BOOTSTRAP_SERVERS_CONFIG to bootstrapServers(), GROUP_ID_CONFIG to "dlt-test-consumer", AUTO_OFFSET_RESET_CONFIG to "earliest")
val dltConsumer = KafkaConsumer(consumerProperties, StringDeserializer(), StringDeserializer())
dltConsumer.subscribe(listOf("$topicName-dlt"))
doAnswer { throw RuntimeException("Oops, something happened!") }
.whenever(sessionStateUpdateEventAuditService).audit(userId, sessionState)
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
kafkaTemplate.send(topicName, userId, sessionState).get()
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(30, TimeUnit.SECONDS)
.untilAsserted {
assertThat(sessionStateUpdateEventAuditService.findByUserId(userId).collectList().block()).hasSize(1)
assertThat(logs.out).contains("Retrying #0 processing String user_987")
assertThat(logs.out).contains("Retrying #1 processing String user_987")
assertThat(logs.out).contains("Retrying #2 processing String user_987")
verify(sessionStateUpdateEventAuditService, times(4)).audit(userId, sessionState)
val records = dltConsumer.poll(Duration.ofMillis(100))
assertThat(records).isNotEmpty.hasSize(1)
assertThat(records.first().key()).isEqualTo(userId)
assertThat(records.first().value()).isEqualTo(sessionState)
}
dltConsumer.unsubscribe()
}
Nothing new here: we basically verify the behavior of the consumer when it encounters errors during message processing and ensure that the messages are appropriately retried by checking the logs and eventually published to DLT if they still fail by consuming from DLT.
Conclusion
Alright, folks, that brings us to the end of our long journey! I hope you enjoyed my approach to building generic reactive consumers, producers, and Dead Letter Topics (DLT). At the very least, you now have a solid starting point for creating your own custom-tailored abstractions based on your specific requirements.
We’ve covered the essentials of creating reactive Kafka message flows using Reactive API for Kafka, explored some error handling with DLTs, and put everything into practice with comprehensive tests using Testcontainers.
If you’d like to revisit the code we discussed or dive deeper into any of the topics, all the examples are available for you to check out using the source code linked in the introduction of this article.
Happy coding!
Opinions expressed by DZone contributors are their own.
Comments