Producer Consumer With Kafka and Kotlin
Join the DZone community and get the full member experience.
Join For FreeIntroduction
In this article, we will develop a simple Spring Boot application using Kafka and Kotlin.
Let's get started. Visit https://start.spring.io and add the following dependencies:
xxxxxxxxxx
implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
This demo makes use of Gradle as the build option. You can choose Maven as well.
Generate and download the project. Then, import this project into IntelliJ IDEA.
Download Apache Kafka
Download the latest version of Apache Kafka from its site and unzip it to a folder. I am using Windows 10 OS. So, you may run into some problem when starting Kafka. It is related to "too many lines encountered". This is because Kafka is appending a big folder structure as the name for its path. If this problem persists, you will have to rename the folder structure to a shorter one and start the application from 'Power Shell'
The following commands are used to start the Kafka:
xxxxxxxxxx
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties
You can see these two commands in '/bin/windows' folder.
In order to run Kafka, you need to start the Zookeeper service first. ZooKeeper is an Apache product that offers distributed configuration service.
Spring Boot Starter
First step is to create a class called KafkaDemoApplication.kt
using your IDE. When you created a project from the Spring Starter web site, this class will be created automatically.
Add the following lines of code:
xxxxxxxxxx
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class KafkaDemoApplication
fun main(args: Array<String>) {
runApplication<KafkaDemoApplication>(*args)
}
Producer
We can send the message to the topic in two ways, which are listed below.
Next, we need to develop a controller class, which is used to send and receive the message. Let call this class KafkaController.kt
. Then, add the following method:
xxxxxxxxxx
var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"
@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
var sendResult: SendResult<String, String> = lf.get()
return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}
We are using KafkaTemplate
to send the message to a topic called test_topic
. This will return a ListenableFuture
object from which we can get the result of this action. This approach is the easiest one if you just want to send a message to a topic.
Another Method
The next method of sending a message to Kafka topic is to use the KafkaProducer
object. We will develop that piece of code.
xxxxxxxxxx
@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
val map = mutableMapOf<String, String>()
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["bootstrap.servers"] = "localhost:9092"
var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
return ResponseEntity.ok(" message sent to " + future.get().topic());
}
It deserves some explanation.
We need to initialize the KafkaProduce
object with a Map that contains a key and value for serialization. In this example, we are dealing with string message so that we need to use only StringSerializer
.
Basically, a Serializer is an interface in Kafka which will convert a string to bytes. Apache Kafka has other serializers, such as ByteArraySerializer
, ByteSerializer
, FloatSerializer
, etc.
We specify the key and value of the map with the StringSerializer
.
xxxxxxxxxx
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
The next value is the bootstrap server details that is required to communicate with the Kafka cluster.
xxxxxxxxxx
map["bootstrap.servers"] = "localhost:9092"
All these three attributes are necessary if we use KafkaProducer
Then, we need create a ProducerRecord
with the name of the topic and the message itself. This is what is achieved in this line.
xxxxxxxxxx
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
Now we can send our message to the topic using the following code:
xxxxxxxxxx
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
This operation will return a future with the name of the topic that is used to send the message.
Consumer
We have seen how to send a message to a topic. But we need to listen for the incoming message. In order to achieve this, we need a develop a listener so that we can consume the message.
Let's create a class called MessageConsumer.kt
and annotate with the @Service
annotation.
xxxxxxxxxx
@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
println(" message received from topic : $message");
}
This method is used to listen for the message with the help of the @KafkaListener
annotation and prints the message on the console once it is available in the topic. But make sure you use the same topic name that is used to send the message to.
You can check the whole source code in my github link repository.
Opinions expressed by DZone contributors are their own.
Comments