Kafka Message Filtering: An Analysis
An analysis of how to implement a Kafka message filtering strategy, first as a general approach, then with the consumer needing to recover after deserialization errors.
Join the DZone community and get the full member experience.
Join For FreeA lot of companies nowadays use event-driven architectures in their day-to-day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.
In such a scenario, during the interactions among the three main types of actors — producers, message brokers, and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.
This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at the consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve the correct behavior of the consumer.
Set-up
- Java 21
- Maven 3.9.2
- Spring Boot – version 3.2.2
- Redpanda message broker running in Docker – image version 23.2.15
As a message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal setup.
Once the Docker container is up and running, a topic called request
is created with the following command:
>docker exec -it redpanda-0 rpk topic create request
TOPIC STATUS
request OK
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421
BROKERS
=======
ID HOST PORT
0* redpanda-0 9092
TOPICS
======
NAME PARTITIONS REPLICAS
__consumer_offsets 3 1
_schemas 1 1
request 1 1
As shown, the request
topic was created successfully.
Implement a Record Filter Strategy
The use case is the following:
- The producer sends a request to the configured topic
- If the request message fulfills the acceptance criteria, the consumer processes it
- Otherwise, the message is discarded
A request message has a simple form:
{
"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
"contextId": "hcd"
}
Having just two fields, an identifier and a context identifier.
Messages are taken into account only in a certain acceptable context. Differently, put, a message is accepted if it has a certain contextId
, that is equal to the one configured on the consumer side, otherwise, it is discarded.
A request is modeled by the following record:
public record Request(String id, String contextId) {
}
For configuring a producer and a consumer, at least these properties are needed (application.properties
file):
# the path to the message broker
broker.url = localhost:19092
# the name of the topic
topic.request = request
# the unique string that identifies the consumer group of the consumer
context.id = hcd
The requirement is clear – only the messages having hcd
as contextId
are accepted.
In order to send messages, a producer needs a KafkaTemplate
instance, configured as below:
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(producerFactory);
}
}
One may observe in the producer configuration that a StringSerializer
was chosen for marshaling the payload value. Usually, a JsonSerializer
provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.
Once the messages reach the request
topic, a consumer is configured to pick them up.
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${broker.url}")
private String brokerUrl;
@Value("${context.id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());
DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(), new JsonDeserializer<>(Request.class));
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(defaultFactory);
factory.setRecordFilterStrategy(recordFilterStrategy);
factory.setCommonErrorHandler(new DefaultErrorHandler());
return factory;
}
}
Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a means to decide whether a message is filtered out or not.
The RecordFilterStrategy
interface has one abstract method:
boolean filter(ConsumerRecord<K, V> consumerRecord);
Which according to its JavaDoc, returns true
if the ConsumerRecord
should be discarded (K
represents the message key, while V
the message value).
In the case of this proof of concept, all messages that have their contextId
equal to hcd
are accepted and consumed, while the rest are filtered out. The implementation is below.
@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {
private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);
@Value("${context.id}")
private String contextId;
@Override
public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
Request request = consumerRecord.value();
boolean discard = !contextId.equals(request.contextId());
LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
return discard;
}
}
As part of the configuration, the KafkaListenerContainerFactory
interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka
annotation on the configuration class enables the detection of @KafkaListener
annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.
@Component
public class RequestMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);
private final ResponseService responseService;
public RequestMessageListener(ResponseService responseService) {
this.responseService = responseService;
}
@KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
public void onMessage(@Payload Request request) {
LOG.info("Processing {}.", request);
responseService.send(Response.success());
}
}
Its functionality is trivial, it logs the messages read from the request
topic and destined to the configured consumer group. Then, it invokes a ResponseService
which acts as the entity that sends a message back (here, it only logs it).
@Service
public class ResponseService {
private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);
public void send(Response response) {
LOG.info("Sending {}.", response);
}
}
A Reponse
is modeled simply, as below:
public record Response (String id,
Result result) {
public static Response success() {
return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
}
public enum Result {
SUCCESS, FAILURE
}
}
When the application is started, provided the message broker is up, the listener is ready to receive messages.
INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}
In order to check the integration, the following two tests are used. Since a Request
is expected by the listener, a compliance template was created for convenience.
@SpringBootTest
class RecordFilterStrategyApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${topic.request}")
private String topic;
@Value("${context.id}")
private String contextId;
private static final String template = """
{
"id": "%s",
"contextId": "%s"
}""";
@Test
void compliant() {
kafkaTemplate.send(topic,
String.format(template, UUID.randomUUID(), contextId));
}
@Test
void notCompliant() {
kafkaTemplate.send(topic,
String.format(template, UUID.randomUUID(), "other context"));
}
}
compliant()
sends a message whose contextId
is as this consumer has configured it. As expected, it is processed and a response is sent back.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].
notCompliant()
sends a message whose contextId
is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.
So far, the proof of concept has exemplified how to configure the consumer with a record-filtering strategy so that only certain messages are accepted.
The code for this part is here – 1-filter-strategy
Implement a Record Filter Strategy With Custom Deserialization
Let’s assume that the messages that are consumed from the request
queue are unmarshalled using a custom deserializer and the filtering is still required.
The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id
field is missing, a runtime RequestDeserializationException
is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.
public class CustomRequestDeserializer extends StdDeserializer<Request> {
private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);
public CustomRequestDeserializer() {
super(Request.class);
}
@Override
public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
ObjectCodec oc = jsonParser.getCodec();
JsonNode root = oc.readTree(jsonParser);
final String contextId = deserializeField(root, "contextId");
final String id = deserializeField(root, "id");
if (id == null || id.isEmpty()) {
throw new RequestDeserializationException("'id' is required");
}
Request request = new Request(id, contextId);
LOG.info("Successfully deserialized {}", request);
return request;
}
}
To apply it, the Request
record is annotated as below:
@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}
Up until now, the behavior described in the first part is preserved. If the previous compliant()
and nonCompliant()
tests are run again, the outcome is the same.
The next analyzed situation is the one in which a RequestDeserializationException
is thrown when deserializing an incoming message. Let’s assume the id
is empty, thus the form is as below:
{
"id": "",
"contextId": "hcd"
}
@Test
void deserializationError_compliant() {
kafkaTemplate.send(topic,
String.format(template, "", contextId));
}
When such a message is received, the outcome is the following:
...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...
An exception thrown at deserialization time determines the message to be neither consumed, nor responded to, but to be lost.
See [Resource 3] for a detailed analysis of situations like this.
One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory
with a failed deserialization function – see line 15 below:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
FailedRequestDeserializationFunction failedDeserializationFunction) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());
JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);
ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);
DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(), valueDeserializer);
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(defaultFactory);
factory.setRecordFilterStrategy(recordFilterStrategy);
factory.setCommonErrorHandler(new DefaultErrorHandler());
return factory;
}
The purpose of the component is to allow recovery after such an exceptional situation and to be able to send a failure response back.
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {
private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);
private final ResponseService responseService;
public FailedRequestDeserializationFunction(ResponseService responseService) {
this.responseService = responseService;
}
@Override
public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
final Exception ex = failedDeserializationInfo.getException();
if (ex instanceof RequestDeserializationException deserializationEx) {
LOG.info("Error deserializing request - {}", deserializationEx.getMessage());
responseService.send(Response.failure());
} else {
LOG.error("Unexpected error deserializing request.", ex);
}
return null;
}
}
If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.
2024-03-13T10:52:38.893+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].
The only left case is that of a non-compliant and incorrect message, meaning the id
is still empty, but the contextId
is different from the expected one.
{
"id": "",
"contextId": "other context"
}
If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId
is non-compliant.
@Test
void deserializationError_notCompliant() {
kafkaTemplate.send(topic,
String.format(template, "", "other context"));
}
2024-03-13T11:03:56.609+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].
The code for this second section is here: 2-filter-strategy-custom-deser.
Implement a Record Filter Strategy With Custom Deserialization – Correctly
The last part of this analysis provides a solution on how to address this last use case.
Before moving on with it, let’s recall what currently happens in all possible use cases:
Correct, Compliant Message
- Since the message is correct, the custom deserializer successfully unmarshalled it
- The failed deserialization function is not invoked
- Since the message is compliant, the record filter strategy does not reject it
- The listener is invoked, it processes the request and sends a response back
Correct, Non-Compliant Message
- Since the message is correct, the custom deserializer successfully unmarshalled it
- The failed deserialization function is not invoked
- Since the message is non-compliant, the record filter strategy rejects it
- The listener is not invoked
Incorrect, Compliant or Non-Compliant Message
- Since the message is incorrect, the custom deserializer throws an exception
- The failed deserialization is invoked and it sends a failure response back
- The record filter strategy is not invoked
- The listener is not invoked
In case of a correct message, the consumer application behaves correctly, irrespective of the compliance of the message.
In case of an incorrect message, a failed response is sent back, irrespective of the compliance of the message, which means the consumer behaves correctly only for compliant messages.
For incorrect, non-compliant messages it should act as follows:
- Since the message is incorrect, the custom deserializer throws an exception
- The failed deserialization is invoked and it sends a failure response back only if the message is compliant
- The record filter strategy is not invoked
- The listener is not invoked
At first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction
needs to be enhanced to also check the message compliance.
Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy
shall be added. To avoid repetition, some refactoring is done.
To isolate the compliance check, a separate component in charge of it is created.
@Component
public class RequestFilterStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);
@Value("${context.id}")
private String contextId;
public boolean filter(String contextId) {
boolean discard = !this.contextId.equals(contextId);
LOG.info("Request is{} compliant.", discard ? "n't" : "");
return discard;
}
}
Then, the component is injected in the CustomRecordFilterStrategy
and in the FailedRequestDeserializationFunction
and consequently, they are refactored as follows.
@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {
private final RequestFilterStrategy requestFilterStrategy;
public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
this.requestFilterStrategy = requestFilterStrategy;
}
@Override
public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
return requestFilterStrategy.filter(consumerRecord.value().contextId());
}
}
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {
private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);
private final RequestFilterStrategy requestFilterStrategy;
private final ResponseService responseService;
public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
ResponseService responseService) {
this.requestFilterStrategy = requestFilterStrategy;
this.responseService = responseService;
}
@Override
public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
final Exception ex = failedDeserializationInfo.getException();
if (ex instanceof RequestDeserializationException deserializationEx) {
LOG.info("Error deserializing request - {}", deserializationEx.getMessage());
if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
responseService.send(Response.failure());
}
} else {
LOG.error("Unexpected error deserializing request.", ex);
}
return null;
}
}
To check the behavior, the last unit test is run again.
@Test
void deserializationError_notCompliant() {
kafkaTemplate.send(topic,
String.format(template, "", "other context"));
}
The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.
2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy : Request isn't compliant.
The code for the enhanced solution is here: 3-filter-strategy-custom-deser-covered
Resources
- Redpanda Quickstart
- Spring for Apache Kafka Reference
- Acting Soon on Kafka Deserialization Errors
- The picture was taken at Legoland, Germany
Published at DZone with permission of Horatiu Dan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments