Capturing Acknowledgement in Kafka Streaming With RecordMetadata
In this article, explore RecordMetadata, and its attributes, and discover how it can be leveraged using various use cases.
Join the DZone community and get the full member experience.
Join For FreeKafka is a powerful streaming platform used for building real-time data streaming applications. When data is streamed into a Kafka broker, Kafka has the ability to provide metadata info about the message published into the Kafka topic. This metadata information can be retrieved using Kafka's inbuilt RecordMetadata class as an acknowledgment to build a guaranteed message delivery mechanism. This article will explore RecordMetadata, its attributes, and how it can be leveraged with various use cases.
What Is RecordMetadata?
Before delving into the details of the RecordMetadata class let's establish some details on key Kafka concepts. In Kafka, the Producer sends streams of data to the Kafka broker that receives, stores, and serves messages to consumers. The Kafka Consumer API allows applications to read streams of data from the cluster. When a producer or publisher sends a message or payload to a Kafka topic, the broker processes this message and returns a response back to the producer. This response includes a RecordMetadata that has been acknowledged by the server. RecordMetadata instance contains details such as the topic name, partition number, offset, timestamp, and more.
RecordMetadata Attributes
- Topic: The name of the topic to which the message was delivered.
- Partition: The topic can be divided into partitions to handle data volume and this partition number within the topic informs where the message was stored.
- Offset: A unique identifier for the record within the partition helps to find the exact location of the message.
- Timestamp: The timestamp when a Kafka broker receives the message or payload.
- Serialized key and value size: The size of the serialized key and value of the message for efficient storage.
- Checksum: Brokers utilize it to make sure messages haven't been corrupted during storage or transmission.
Use Cases for RecordMetadata
The RecordMetadata class provides essential information that can be used in various scenarios. Here are some common use cases:
1. Monitoring and Logging
In one of my past projects, we had to make sure that external data ingested in our ecosystem must go to a data lake routed through a Kafka broker for monitoring, audit, and reporting purposes. Initial deployment went really well but we started noticing that there were some glitches with overall Kafka availability and these issues were mainly associated with the underlying cloud provider network issue.
We leveraged RecordMetadata to log and monitor the details of produced records. We captured metadata such as the topic, partition, and offset to keep track of the success of these messages flowing through the Kafka infrastructure. For successful scenario topics, partition and offset details were inserted into the database.
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-name", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Record delivered to topic " + metadata.topic() +
" partition number: " + metadata.partition() +
" with offset details: " + metadata.offset());
// Save above info in database
} else {
exception.printStackTrace();
}
});
2. Handle Error With a Retry Process
As mentioned above, network issues and broker downtime can cause message production failures. By leveraging RecordMetadata, data producers can implement intelligent error handling and retry mechanisms to make sure we publish each and every message to fulfill audit and regulatory requirements. For instance, if a message fails to be produced, the producer can log the metadata and attempt to resend the message to the broker on the fly. If the issue persists a separate process can pick this message from the data store and retry at a later time based on the flag status saved in the database.
Example:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error publishing the message:: " + exception.getMessage());
//Implement retry logic here
//Add a flag in the database if it is not successful after “Nth” retry.
}
});
3. Performance Metrics
Performance metrics can be generated using RecordMetadata attributes. Evaluating these attributes, developers can write a few lines of code to measure the latency and throughput of their Kafka message delivery operation. This information is important for optimizing performance metrics and adhering to SLAs. Preventive measures can be implemented for a high latency scenario to contain the issue locally and reduce the overall blast radius.
Example:
long initialTime = System.currentTimeMillis();
producer.send(record, (metadata, exception) -> {
long timeToDeliverMessage = System.currentTimeMillis() - initialTime;
if (exception == null) {
System.out.println("Message delivered successfully in " + timeToDeliverMessage + " ms");
} else {
// log errors
exception.printStackTrace();
}
});
Conclusion
The RecordMetadata class gives information about responses from a Kafka broker that developers can utilize to implement monitoring features, error handling capability, and derive performance metrics using RecordMetadata attributes. Data producers can utilize these attributes to build more reliable and efficient guaranteed data streaming implementation to fulfill organizational needs.
References
Opinions expressed by DZone contributors are their own.
Comments