Take a Deep Dive Into the Kafka Producer API
This article teaches us about the Kafka Producer API and demonstrates how to construct a Kafka producer.
Join the DZone community and get the full member experience.
Join For FreeIn this article, we are going to learn about the Kafka Producer API. If you are new to Kafka, then I recommend that you get a basic idea of Kafka Quickstart from Kafka-quickstart.
There are many reasons an application might need to write messages to Kafka: recording metrics, storing log messages, buffering information before writing to a database, recording data that comes from sensors, and much more.
You may also like: Kafka Producer Overview
Producer Flow
We start producing messages to Kafka by creating a ProducerRecord, which must include the topic we want to send the record and value to. We can also specify a key and/or a partition. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to ByteArrays so they can be sent over the network. Now, the data is sent to a partitioner.
If we specified a partition in the ProducerRecord, the partitioner doesn’t do anything and simply returns the partition we specified. In addition, if we didn’t, the partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. Then, it will add the record to a batch of records that will also be sent to the same topic and partition.
A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers. When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.
Constructing a Kafka Producer
The first step in writing messages to Kafka is to create a producer object with the properties you want to pass to the producer. A Kafka producer has three mandatory properties:
1. bootstrap.servers
List of host:port pairs of brokers that the producer will use to establish the initial connection to the Kafka cluster. This list doesn’t need to include all brokers since the producer will get more information after the initial connection.
2. key.serializer
Name of a class that will be used to serialize the keys of the records. We will produce to Kafka. Kafka brokers expect byte arrays as keys and values of messages. key.serializer should be set to the name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array.
3. value.serializer
The name of a class that will be used to serialize the values of the records we will produce to Kafka. The same way you set key.serializer to a name of a class that will serialize the message key object to a byte array, you set value.serializer to a class that will serialize the message value object.
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
- We start with a Properties object.
- Since we plan on using strings for message key and value, we use the built-in StringSerializer.
- Here, we create a new producer by setting the appropriate key and value types and passing the Properties object.
Start Sending Messages
We can send data by three types:
1. Fire-and-forget
We send a message to the server and don’t really care if it arrives successfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
2. Synchronous send
We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
Here, we are using Future.get() to wait for a reply from Kafka.
3. Asynchronous send
We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
To use callbacks, you need a class that implements the org.apache.kafka. clients.producer.Callback interface, which has a single function—onCompletion().
Conclusion
Thank you for reading! If you like this article, please do show your appreciation by giving it a thumbs up. Share this article and feel free to give me suggestions on how I can improve my future posts to suit your needs. Follow me to get updates on different technologies. For any queries, feel free to contact me at jashan.goyal@knoldus.in.
Further Reading
Published at DZone with permission of Jashan Goyal. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments