Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
Learn how to handle schema versioning and updates in Kafka and other event streaming platforms without using schema registries through custom deserializers.
Join the DZone community and get the full member experience.
Join For FreeIn real life, change is constant. As businesses evolve, the technology systems supporting them must also evolve. In many event-driven systems today, event streaming platforms like Kafka, Kinesis, and Event Hubs are crucial components facilitating communication between different technology systems and services. As these systems and services change, the schema of event streaming platform messages needs to be updated. The most common way to address this problem is by using schema registries like Confluent Schema Registry, AWS Glue Schema Registry, and Azure Schema Registry. However, in this article, I am going to discuss a simple solution that does not use any of these schema registries.
Although I will use Kafka as an example in this article, this strategy can be applied to any other event streaming platform or messaging queue.
A Bit About Kafka, Messages, and Message Schemas
Before diving into the solution, let's quickly refresh our understanding of basic Kafka terminology. These terms are very similar to other streaming platforms and messaging queues.
Kafka
Apache Kafka is an open-source stream-processing platform designed for handling real-time data feeds. It acts as a distributed publish-subscribe messaging system, providing scalability, fault tolerance, and durability through data replication and persistent storage.
- Kafka producer: A Kafka producer is a client application that sends messages to Kafka topics.
- Kafka consumer: A Kafka consumer is a client application that reads messages from Kafka topics.
- Kafka message: A Kafka message is a discrete unit of data sent from a producer to a Kafka topic. It consists of a key, a value, and optional metadata called headers. Messages are stored in a durable log and can be consumed by multiple consumers, ensuring reliable and scalable data processing.
- Kafka message schema: A Kafka message schema defines the structure and format of the data contained in Kafka messages. It specifies the fields, data types, and any constraints for the message contents, ensuring consistency and compatibility across producers and consumers. Common schema formats used with Kafka include Avro, JSON, and Protobuf. By using schemas, Kafka enables efficient data serialization, validation, and evolution, facilitating reliable and structured data exchange in a streaming architecture.
How To Solve It
Note: This solution assumes that the reader has a basic knowledge of Java and Kafka, and already has an existing Kafka setup including producers and consumers. The focus will be on handling schema version updates within this context.
To solve this, you will need to define versionId
in your message schema. This is required to identify different versions of messages. Secondly, you will need to define a custom deserializer in your consumer to deserialize incoming messages according to their version.
Let's understand this with an example. Suppose you are building a pipeline for processing food delivery orders.
Here is how sample order data looks for version 1.0:
{
"versionId": "1.0",
"orderId": "order-123",
"orderDateTime": "2024-07-29T05:23:41Z",
"value": "100.00",
"valueCurreny": "USD"
}
Here is your sample POJO or data class Order
:
data class Order(
val versionId: String,
val orderId: String,
val orderDateTime: String,
val value: String,
val valueCurreny: String
)
Now, let's say you want to update the message schema to change the value
field to amount
and valueCurrency
to amountCurrency
. This is a breaking change.
The updated order data will look like this. Note that we have to bump the version to 2.0.
{
"versionId": "2.0",
"orderId": "order-123",
"orderDateTime": "2024-07-29T05:23:41Z",
"amount": "100.00",
"amountCurreny": "USD"
}
To handle this, we need to introduce a custom deserializer for the Order
class.
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonNode
class OrderDeserializer : JsonDeserializer<Order>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): Order {
val node: JsonNode = p.codec.readTree(p)
val versionId = node.get("versionId").asText()
val orderId = node.get("orderId").asText()
val orderDateTime = node.get("orderDateTime").asText()
val amount = when(versionId) {
"1.0" -> node.get("value").asText()
"2.0" -> node.get("amount").asText()
else -> throw IllegalArgumentException("Unknown version: $versionId")
}
val amountCurrency = when(versionId) {
"1.0" -> node.get("valueCurreny").asText()
"2.0" -> node.get("amountCurreny").asText()
else -> throw IllegalArgumentException("Unknown version: $versionId")
}
return Order(versionId, orderId, orderDateTime, amount, amountCurrency)
}
}
Here is the updated Order
class:
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
@JsonDeserialize(using = OrderDeserializer::class)
data class Order(
val versionId: String,
val orderId: String,
val orderDateTime: String,
val amount: String,
val amountCurreny: String
)
Here is a sample code for the Kafka consumer:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.*
fun main() {
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer::class.java.name)
}
val consumer = KafkaConsumer<String, Order>(props)
consumer.subscribe(listOf("orders"))
try {
while (true) {
val records = consumer.poll(java.time.Duration.ofMillis(100))
for (record in records) {
val order = record.value()
println("Received Order: $order")
}
}
} finally {
consumer.close()
}
}
Some Worthy Consideration
- Schema changes should be backward compatible: Ensure that any changes to your schema are backward compatible to avoid breaking existing consumers.
- Deploy consumer code changes first: Always deploy the updated consumer code before producing new message versions to handle schema changes gracefully.
Conclusion
There are various schema registries available to handle schema updates for event streaming platforms. However, if your service does not integrate with a schema registry, you can manage breaking schema changes in Kafka or any other event streaming platform without using a schema registry.
Opinions expressed by DZone contributors are their own.
Comments