Acting Soon on Kafka Deserialization Errors
Event-driven architectures excel at performance, scalability, evolvability, and fault-tolerance providing a good level of abstraction and elasticity.
Join the DZone community and get the full member experience.
Join For FreeEvent-driven architectures have been successfully used for quite an amount of time by a lot of organizations in various business cases. They excel at performance, scalability, evolvability, and fault tolerance, providing a good level of abstraction and elasticity. These strengths made them good choices when applications needed real or near real-time reactiveness.
In terms of implementations, for standard messaging, ActiveMQ and RabbitMQ are good candidates, while for data streaming, platforms such as Apache Kafka and Redpanda are more suitable. Usually, when developers and architects need to opt for either one of these two directions they analyze and weigh from a bunch of angles – message payload, flow and usage of data, throughput, and solution topology. As the discussion around these aspects can get too big and complex, it is not going to be refined as part of this article.
Conceptually, event-driven architectures involve at least three main actors: message producers, message brokers, and message consumers. Briefly, the purpose is to allow the producers and the consumers to communicate in a decoupled and asynchronous way that is accomplished with the help of the previously mentioned message brokers. In the optimistic scenario, a producer creates a message, publishes it to a topic owned by the broker from which the consumer reads it, deals with it, and out of courtesy provides a response back. Messages are serialized (marshaled) by the producers when sent to topics and de-serialized (unmarshaled) by consumers when received from topics.
This article focuses on the situation in which a consumer experiences issues when de-serializing a received message and provides a way of being able to act further. A few examples of such actions may include constructing a default message or sending back feedback to the message broker. Developers are creative enough to decide on this behavior, depending on the particular implemented use cases.
Setup
- Java 21
- Maven 3.9.2
- Spring Boot – version 3.1.5
- Redpanda message broker running in Docker – image version 23.2.15
Redpanda is a lightweight message broker, and it was chosen for this proof of concept to give the readers the opportunity to experiment with a different option than the widely used Kafka one. As it is Kafka-compatible, the development and the configuration of the producers and consumers will not need to change at all if moving from one service provider to another.
According to Redpanda documentation, the Docker support applies only to development and testing. For the purpose of this project, this is more than enough; thus, a single Redpanda message broker is set up to run in Docker.
See Resource 1 at the conclusion of this article for details on how to accomplish the minimal setup.
Once up and running, a topic called minifig
is created with the following command:
>docker exec -it redpanda-0 rpk topic create minifig
TOPIC STATUS
minifig OK
If the cluster is inspected, one may observe that a topic with one partition and one replica was created.
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421
BROKERS
=======
ID HOST PORT
0* redpanda-0 9092
TOPICS
======
NAME PARTITIONS REPLICAS
__consumer_offsets 3 1
_schemas 1 1
minifig 1 1
Implementation
The flow is straightforward: the producer sends a request to the configured topic which is further read by the consumer, as it is able to.
A request represents a mini-figure that is simplistically modeled by the following record:
public record Minifig(String id,
Size size,
String name) {
public Minifig(Size size, String name) {
this(UUID.randomUUID().toString(), size, name);
}
public enum Size {
SMALL, MEDIUM, BIG;
}
}
id
is the unique identifier of the Minifig which has a certain name
and is of a certain size
– small, medium, or big.
For configuring a producer and a consumer, at least these properties are needed (application.properties
file):
# the path to the message broker
broker.url=localhost:19092
# the name of the broker topic
topic.minifig=minifig
# the unique string that identifies the consumer group of the consumer
topic.minifig.group.id=group-0
For sending messages, the producer needs a KafkaTemplate
instance.
@Configuration
public class KafkaConfig {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(producerFactory);
}
}
One may observe in the producer configuration that a StringSerializer
was chosen for marshaling the payload value. Usually, a JsonSerializer
provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility on the consumer side (as we will see later). Just as a reminder, the interest in this proof of concept is to act on the encountered deserialization errors.
Once the messages reach the minifig
topic, a consumer is configured to pick them up.
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${broker.url}")
private String brokerUrl;
@Value("${topic.minifig.group.id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());
DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props);
ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(defaultFactory);
factory.setCommonErrorHandler(new DefaultErrorHandler());
return factory;
}
}
The KafkaListenerContainerFactory
interface is responsible for creating the listener container for a particular endpoint. The @EnableKafka
annotation on the configuration class enables the detection of @KafkaListener
annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.
@Component
public class MinifigListener {
private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class);
@KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}")
public void onReceive(@Payload Minifig minifig) {
LOG.info("New minifig received - {}.", minifig);
}
}
Its functionality is trivial. It only logs the messages read from the minifig topic, destined for the configured consumer group.
If the application is started, provided the message broker is up, the listener is ready to receive messages.
INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig
INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null)
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0
INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-0: partitions assigned: [minifig-0]
In order to check the integration, the following simple test is used. Since a Minifig
is expected by the listener, a compliance template was created for convenience.
@SpringBootTest
class AppTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${topic.minifig}")
private String topic;
final String template = "{" +
"\"id\":\"%s\"," +
"\"size\":\"%s\"," +
"\"name\":\"%s\"" +
"}";
@Test
void send_compliant() {
final String minifig = String.format(template,
UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man");
kafkaTemplate.send(topic, minifig);
}
}
When running the test, a "compliant" message is sent to the broker, and as expected, it is successfully picked up by the local consumer.
INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man].
Redpanda Console can be helpful in observing what is happening at the broker level, particularly what is flowing through the minifig
topic.
In scenarios such as the one above, messages are sent from the producer to the consumer via the message broker, as planned.
Recover on Deserialization Failures
In the particular case of this proof of concept, it is assumed the type of a mini-figure can be SMALL, MEDIUM, or BIG, in line with the defined Type enum
. In case the producer sends a mini-figure of an unknown type, one that deviates a bit from the agreed contract, the messages are basically rejected by the listener, as the payload cannot be de-serialized.
To simulate this, the following test is run.
@SpringBootTest
class AppTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${topic.minifig}")
private String topic;
final String template = "{" +
"\"id\":\"%s\"," +
"\"size\":\"%s\"," +
"\"name\":\"%s\"" +
"}";
@Test
void send_non_compliant() {
final String minifig = String.format(template,
UUID.randomUUID(), "Unknown", "Spider-Man");
kafkaTemplate.send(topic, minifig);
}
}
The message reaches the topic, but not the MinifigListener#onReceive()
method. As expected, the error appeared when the payload was being unmarshaled. The causes can be depicted by looking deep down the stack trace.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data from topic [minifig]
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
One other aspect is that the messages are continually tried to be read on the consumer side. This is unfortunate at least from the consumer point of view, as the logs are accumulating.
In order to pass over such situations, the JsonDeserializer
used for unmarshaling the payload value is decorated in an ErrorHandlingDeserializer
as its actual delegate. Moreover, the ErrorHandlingDeserializer
has a failedDeserializationFunction
member that according to its JavaDoc
, provides an alternative mechanism when the deserialization fails.
The new consumer configuration looks as below:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());
JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class);
ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction());
DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(), valueDeserializer);
ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(defaultFactory);
factory.setCommonErrorHandler(new DefaultErrorHandler());
return factory;
}
The failedDeserializationFunction
used here is simplistic, but the reason is to prove its utility.
public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> {
private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class);
@Override
public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) {
final Exception exception = failedDeserializationInfo.getException();
LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage());
return new Minifig("Default");
}
}
The FailedDeserializationInfo
entity (the Function#apply()
input) is constructed during the recovery from the de-serialization exception and it encapsulates various pieces of information (here, the exception is the one leveraged).
Since the output of the apply()
method is the actual deserialization result, one may return either null
or whatever is suitable depending on the aimed behavior.
If running the send_non_compliant()
test again, the deserialization exception is handled and a default value is returned. Further, the MinifigListener
is invoked and has the opportunity to deal with it.
INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].
Conclusion
Configuring Kafka producers and consumers and fine-tuning them in order to achieve the desired performance in accordance with the used message brokers is not always straightforward. Controlling each step of the communication is by all means something desirable and, moreover, acting fast in unknown situations helps deliver robust and easy-to-maintain solutions. This post focused on the deserialization issues that might appear at the Kafka consumer level and provided a way of having a second plan when dealing with non-compliant payloads.
Sample Code
Resources
- Redpanda Quickstart
- Spring for Apache Kafka
- The featured image was taken at Zoo Brasov, Romania
Published at DZone with permission of Horatiu Dan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments