How To Implement a Kafka Producer
We run through a quick tutorial on how create and use Kafka producers for a big data project, and how to take advantage of Kafka's Java API.
Join the DZone community and get the full member experience.
Join For FreeThis article deals with the ways to implement a Kafka producer.
A Kafka producer is an application that can act as a source of data in a Kafka cluster. A producer can publish messages on one or more Kafka topics.
So, how many ways are there to implement a Kafka producer? Well, there are a lot! But in this article, we shall walk you through two ways.
- Kafka Command Line Tools
- Kafka Producer Java API
Create a Kafka Producer Using the Command Line Interface
In addition to the APIs provided by Kafka for different programming languages, Kafka also provides tools to work with the basic components like producers, consumers, topics, etc., via the Command Line Interface.
Before we can start a Kafka producer, we should start a Kafka cluster and create a Kafka topic.
Navigate to the root directory of the Kafka package and run the following commands.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
To create a Kafka topic named "sampleTopic", run the following command.
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic
Finally, to create a Kafka Producer, run the following:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic
The Kafka producer created connects to the cluster which is running on localhost and listening on port 9092. The producer posts the messages to the topic, "sampleTopic".
When you run the above shell script, a console appears.
We can start sending the messages to the Kafka cluster from the console. The messages will be published to the Kafka Topic, "sampleTopic".
Kafka Producer Using Java
Kafka provides a Java API. Using this, we can write a Java program that acts as a Kafka producer.
To create a Kafka producer using Java, we need to write two Java files.
- SampleProducer.java: A thread that acts as a producer and contains configuration information.
- KafkaProducerDemo.java: Contains code to start the thread, SampleProducer.java.
SampleProducer.java
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Producer Example in Apache Kafka
* @author www.tutorialkart.com
*/
public class SampleProducer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final String CLIENT_ID = "SampleProducer";
public SampleProducer(String topic, Boolean isAsync) {
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put("client.id", CLIENT_ID);
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
// handle the exception
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* onCompletion method will be called when the record sent to the Kafka Server has been acknowledged.
*
* @param metadata The metadata contains the partition and offset of the record. Null if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
Also, messages can be sent synchronously and asynchronously. For asynchronously sending messages, theDemoCallback
class is used as a callback after the message is successfully posted to the Kafka topic.
KafkaProducerDemo.java
public class KafkaProducerDemo {
public static final String TOPIC = "testTopic";
public static void main(String[] args) {
boolean isAsync = false;
SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
// start the producer
producerThread.start();
}
}
In the above example, a synchronous Kafka producer is created and then started using Thread.start()
.
Run KafkaProducerDemo.java.
Sent message: (1, Message_1)
Sent message: (2, Message_2)
Sent message: (3, Message_3)
Sent message: (4, Message_4)
Sent message: (5, Message_5)
Sent message: (6, Message_6)
Sent message: (7, Message_7)
Sent message: (8, Message_8)
Sent message: (9, Message_9)
Sent message: (10, Message_10)
Sent message: (11, Message_11)
Sent message: (12, Message_12)
The messages are published synchronously to the Kafka topic.
References and Further Reading
Opinions expressed by DZone contributors are their own.
Comments