Resilient Kafka Consumers With Reactor Kafka
This article shows how to avoid the most common pitfalls when building reactive Kafka consumers using Reactor Kafka.
Join the DZone community and get the full member experience.
Join For FreeWe introduce a recipe for creating resilient Kafka consumers using Reactor Kafka. This approach is one that we've developed over time and incorporates the learnings from our experience with running Reactor Kafka - and all the challenges that come with that. The consumer described in this article provides at-least-once delivery semantics using manual acknowledgments, which is mostly suited for applications where data loss is not acceptable. However, most of the concepts described here still apply to other types of delivery semantics supported by the framework.
Kafka Configuration
The configuration class defines the properties to be used by the Kafka consumer, after which it uses them to instantiate a KafkaReceiver
that will be made available as a bean in the application context.
protected Map<String, Object> kafkaConsumerProperties() {
Map<String, Object> kafkaPropertiesMap = new HashMap<>();
kafkaPropertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaPropertiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
kafkaPropertiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
kafkaPropertiesMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
kafkaPropertiesMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
...
return kafkaPropertiesMap;
}
protected ReceiverOptions<K, V> kafkaReceiverOptions() {
ReceiverOptions<K, V> options = ReceiverOptions.create(kafkaConsumerProperties());
return options.pollTimeout(Duration.ofMillis(pollTimeout)).subscription(List.of(consumerTopicName));
}
@Bean
KafkaReceiver<K, V> kafkaReceiver() {
return KafkaReceiver.create(kafkaReceiverOptions());
}
The configuration is quite standard, but one noteworthy aspect is the use of Spring Kafka's ErrorHandlingDeserializer
. Although Reactor Kafka does not depend on Spring Kafka (which is a separate project altogether), we find these deserializers to be very convenient in ensuring that the consumer can handle and recover from deserialization errors, which would otherwise cause it to effectively get stuck at the faulty record. An alternative approach could be to implement similar custom delegating deserializers with error handling capabilities, which would then eliminate the dependency on the Spring Kafka library.
Creating the KafkaReceiver
bean makes it available for injection in other parts of the application, where we can then start consuming and processing the events. This is described in detail in the next section.
Main Consumer Pipeline
Different frameworks define different abstractions over Kafka consumers, which are meant to simplify the implementation for the developers. In the case of Reactor Kafka, the abstraction of choice is an inbound Flux where all events received from Kafka are published by the framework. This Flux is created by calling one of the receive/receiveAtmostOnce/receiveAutoAck/receiveExactlyOnce
methods on the KafkaReceiver
. In our implementation, we create the Flux
and subscribe to it upon application startup, and the lifecycle of this subscription largely follows the lifecycle of the service instance. In other words, the subscription will (ideally) remain active for as long as the service is running.
@EventListener(ApplicationStartedEvent.class)
public Disposable startKafkaConsumer() {
return kafkaReceiver.receive()
.doOnError(error -> log.error("Error receiving event, will retry", error))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMinutes(1)))
.doOnNext(record -> log.debug("Received event: key {}", record.key()))
.concatMap(this::handleEvent)
.subscribe(record -> record.receiverOffset().acknowledge());
}
Consumer Error Handling
The first important aspect to keep in mind is that, in a reactive publisher, an error is a terminal signal, causing the termination of the subscription. In the concrete case of reactive Kafka consumers, the implication is that any error thrown anywhere in the pipeline (e.g. caused by transient connection issues, etc.) will effectively cause the consumer to shut down. Unfortunately, this also means that the service instance will continue to run without actually consuming any Kafka events. To mitigate this issue, we introduce a retry mechanism through the use of the retryWhen
operator to ensure that errors are intercepted and that the upstream publisher is re-subscribed to (and thus the Kafka consumer is recreated) in case of any such error. Note that this retry is placed immediately after the source publisher in the reactive pipeline, before the actual event processing, and will therefore only intercept errors thrown by the Kafka consumer itself. Also, the retry policy specifies an effectively infinite number of retries at an interval of 1 minute. The reason for this is that having the service instance running without an active Kafka consumer inside is of no use, so the best course of action is to retry indefinitely until one of two possible scenarios plays out :
- In the case of transient errors (e.g., connection glitches, rebalancing, etc.), the consumer will eventually re-subscribe and start consuming events successfully
- In the case of non-transient errors, the error log printed before the retry will trigger an alert causing a human operator to take action and investigate the issue. (discuss nuances of interval and retriable alerts)
Choice of Event Handling Operator and Acknowledgement
Once an event is received, it must be processed by the application and subsequently acknowledged. This sequencing provides at-least-once delivery semantics - other types of semantics (at-most-once, exactly-once) may cause the pipeline to look differently. The pattern we propose delegates the responsibility of event handling to a separate method called handleEvent
, which always returns the ReceiverRecord
used by the subscriber to acknowledge the offset (this method is described in detail in the next section). However, the operator that we choose to call this method has a critical impact on the behavior of the consumer. Let's analyze three different options:
flatMap
- this operator applies the provided mapper function to create inner publishers to which it then subscribes eagerly. Provided that these inner publishers are non-blocking, they will be subscribed to in parallel, and the elements produced downstream are not guaranteed to preserve the order in which the original elements (the Kafka events) were received from upstream. In the case of our Kafka consumer, this means that the Kafka events will be processed in parallel, and the offsets will be committed as each event is handled and passed downstream. But whenever one offset is committed, it implicitly commits all the lower offsets. Imagine that the processing of one event finishes and its offset is committed, but later on, the processing of another event with a lower offset fails: the second event will not be re-processed since we already implicitly committed its offset. This can be problematic, especially in cases where at-least-once semantics is required, and it's an important consideration to keep in mind when deciding to useflatMap
(Reactor Kafka has recently implemented an out-of-order commits feature to mitigate precisely this issue)flatMapSequential
- much likeflatMap
, this operator subscribes to the inner publishers eagerly; however, the difference here is thatflatMapSequential
will publish elements downstream in the same order in which they were originally received from upstream (this is done by delaying publishing if needed, to preserve the order). The fact that events will still be processed in parallel can come with performance benefits in scenarios where this does not impact correctness, e.g., where events refer to distinct entities and can be processed in any order. In addition, the preservation of the sequence will ensure that the offsets are committed in order and thus avoid the problem described above. Of course, deferring the offset commit also increases the risk of duplicate event processing in case the consumer crashes, which is something the application must be prepared to handle (e.g., by ensuring the event processing is idempotent)concatMap
- unlike the two previous operators,concatMap
creates and subscribes to the inner publishers sequentially. This is extremely important in scenarios where the events must be processed in the exact order in which they were read from the partition.
Event Handling
The handleEvent
method is where all the logic related to the processing of a single event is contained. In most cases, the actual input argument to this method is the ReceiverRecord
containing the Kafka event, and it is the same record that is expected to be returned after the event has been processed (successfully or unsuccessfully).
In our implementation, there are two guarantees this method is expected to provide:
1. Encapsulated error handling. The method is always expected to return the receiver record and must never return or throw an error. As explained in the previous section, any error signal is terminal and will cause the main pipeline (and thus the Kafka consumer) to shut down. Therefore, the logic contained within this method must handle any potential error that might occur while processing the event.
2. Idempotence. This is not a universal requirement; rather, it is specific to at-least-once semantics scenarios. Since events are more likely to be re-processed, a mechanism is needed to ensure that this does not cause any unwanted side effects.
The snippet below shows a simple implementation of this method.
/*
This method will handle the received event and then re-publish it regardless of the result.
The method must never return an error signal as that will terminate the main consumer pipeline.
*/
private Mono<ReceiverRecord<String, Event>> handleEvent(ReceiverRecord<String, Event> record) {
return Mono.just(record)
.map(KafkaDeserializerUtils::extractDeserializerError)
.<Event> handle((result, sink) -> {
if (result.getT2().isPresent() && Objects.nonNull(result.getT1().value())) {
// Deserialization error processing
log.error("Deserialization error encountered", result.getT2().get());
} else {
// Publish the event value downstream
sink.next(result.getT1().value());
}
})
.flatMap(businessLayerService::processEvent)
.doOnError(ex -> log.warn("Error processing event: key {}", record.key(), ex))
.onErrorResume(ex -> Mono.empty())
.doOnNext(record -> log.debug("Successfully processed event: key {}", record.key()))
.then(Mono.just(record));
}
Assuming that the pipeline has been correctly assembled and subscribed to, the first step is to check for any key/value deserialization errors. Since the consumer is configured to use Spring Kafka's ErrorHandlingDeserializer
, any such error will be set as a header on the ReceiverRecord
(and in those cases, the actual record value will be null
). For convenience, we have created a method that attempts to extract this error from the header - extractDeserializerError
. This method returns a Tuple2
, where the first value is the current record and the second value is an Optional
containing the deserialization error if present. We then use the handle
operator to ensure that we only process those events for which deserialization was successful. In the snippet above, we simply log the deserialization error, and since, in this case, nothing is published to the sink
, the event is effectively discarded. Again, there are other ways to handle this case (this post gives a great detailed overview of the options). If there is no deserialization error present, we pass the event value downstream for processing.
The next step is the actual event processing, which in this case is being delegated using the flatMap
operator to a specialized business-layer service. The expectation here is that this service will ensure the above-mentioned idempotence guarantees. Exactly how this is done varies greatly depending on the use case and what the actual event processing consists of (database operations, calls to external systems, etc.), and there is no silver bullet solution to tackle this. Therefore, the processing pipeline must delegate this responsibility to a specialized component and cannot generically solve this itself.
Additionally, this event processing call might result in an error. Assuming that the idempotence guarantees are correctly implemented, one might argue that it is safe to insert a retry
operator immediately after to increase resiliency. This is not necessarily false, but we believe it might not be optimal for two main reasons: firstly, the event processing might consist of multiple operations, e.g., multiple database calls followed by a call to an external system, etc. Having a 'blanket' retry would re-trigger all these operations, which might not be necessary (imagine it was only that last call to the external system that failed, a retry on that specific operation would be much more efficient). In general, we prefer to define the retries as close as possible to the source, i.e., the publisher we want to re-subscribe to. Secondly, the retries are only helpful for transient errors. Determining whether or not an error is transient is very specific to the type of operation that caused the error (database call, HTTP request, etc.) and cannot be done at a high level. 'Blanket' retrying on non-transient errors can again prove to be wasteful. This is why we chose to also delegate the responsibility of implementing all required retries to the component performing the actual event processing.
Finally, the onErrorResume
operator is meant to handle any subscription-time errors, most often coming from the event processing component after all retries are exhausted. In the current snippet, an error log is printed, after which the error is 'swallowed' by onErrorResume,
and the processing moves forward. Same as before, other error handling options could be employed here, depending on the requirements.
Conclusion
The approach discussed in this article can help avoid some of the most common caveats when creating Kafka consumers using Reactor Kafka. Although the consumer presented provides at-least-once delivery semantics with manual acknowledgments, the concepts generally apply to other types of consumers as well.
Opinions expressed by DZone contributors are their own.
Comments