Spring Boot and Kafka Configuration Tuning
See a setup for configuration tuning in an isolated environment and to determine the Spring Boot, Kafka configuration and best practices for moderate uses.
Join the DZone community and get the full member experience.
Join For FreeSummary
The goal of this exercise is to provide a setup for configuration tuning in an isolated environment and to determine the Spring Boot, Kafka configuration, and best practices for moderate uses. The high-level observations are:
- Partition your Kafka topic and design system stateless for higher concurrency. For most of the moderate use cases (we have 100,000 messages per hour) you won't need more than 10 partitions. As a rule of thumb; concurrency direction is directly proportional to the times the consumer spends processing a single message.
- The default Spring boot configuration are very reasonable for moderate uses. Avoid changing them without a proper root cause analysis.
- Although it differs from use case to use case, it is recommended to have the producer receive acknowledgment from at least one Kafka Partition leader and manual acknowledgment at the consumer side.
- Size Kafka topics appropriately, by using small messages that are less than 0.5 MB, to avoid Kafka broker timeout errors. If required, chunk the large data into small messages, send them through a single partition, and reconstruct the message at the consumer side.
Challenge
I recently got involved in an initiative where we had a need to hydrate two or more clusters of an In Memory Data Grid (IMDG) with transient data from a combination of data sources. [Please don’t ask me why? :)]
Given the fact that All IMDG clusters must be always in an identical state, the option of sourcing the data directly from each IMDG was out of the question for obvious reasons. I needed a common memory space that is accessible or can flow the data to all IMDGs. In our infrastructure, Kafka was the only available system that was accessible from all IMDGs. Hence, it was decided to fill a Kafka topic once from all data sources, and then use it as a single data source to hydrate all IMDGs.
Purpose, Performance Needs, and Scope
Our performance needs are not very high. As of now; the system needs to move less than 40GB of data (less than 100,000 messages). The purpose was to determine the impact of popular configurations like concurrency, single/batch and auto/manual act mode for the consumer; and concurrency, retries and acks (none/1/all) for the producer.
Given my needs, and to time box this exercise, I didn’t experiment much tuning batch and custom buffer sizing in the consumer or producer. Also, all the experimenting was done within a single JVM; and the configuration impact on multiple processes (horizontal scaling) with multiple Kafka topics and multiple partitions is out of scope from this exercise. I also didn’t check if compression makes a lot of difference. Usually, Kafka topics have a limitation of message size and it should be considered especially if messages are traveling a long distance.
PS: If someone has done the hard work and willing to share the observations of any configurations I missed, I’d be happy to hear about it.
Test Environment
All the tests below are performed using –
- Kafka 2.12-2.5.0 (https://kafka.apache.org/quickstart)
- Spring Boot 2.2.8. RELEASE
- spring-kafka (https://spring.io/projects/spring-kafka#overview)
- MacBook Pro; MacOS Mojave 10.14.6; 16 GB RAM; 2.8 GHz Intel Core 7
Observations
As a baseline, I ran one million (10,00,000) custom messages where each message is 1000 bytes with one topic and one default partition. The entire process was completed in ~45217 milliseconds, which can vary depending on your computer specs.
Concurrency Observation: Increasing the consumer concurrency won’t speed up the consumption of messages from Kafka topics. If your consumers are fast, keeping the concurrency low will yield better results, but if your consumer spends a significant time processing the message, higher concurrency will improve the throughput. Keep in mind that higher concurrency is not directly proportional to higher throughput, so you will have to find the right balance.
Batch Observation: Within my setup, introducing batching (spring.kafka.listener.type: batch) with most of Spring Boot’s default settings didn’t make much of a difference in performance.
Consumer Manual Ack Observation: I didn’t notice any performance impact of using manual ack, so I would recommend to use it in all non-trivial scenarios. In case of consumer failure, it will help start again where the consumer left off.
Producer Ack and retries Observation: I noticed considerable performance degration of using ack=1 (conformation by leader) and almost none when using retries. Even with the additional cost, I would recommend that every non-trivial application should leverage it.
Kafka Broker and message size: I have observed issues in term of performance and Broker timeout with a large message size. Using 0.5MB turned out to be a good size for our volume. If message frequency is higher, try to keep the message size smaller. Also, size your topics and message retention appropriately. In my experiments, I am using 5GB topic (Kafka will rollover as soon as it reaches the defined limit) with one-day retention.
Overall: Spring Boot’s default configuration is quite reasonable for any moderate uses of Kafka. Based on my observation, the only tweak that I would recommend is to keep the acks=1 at the producer side (to get the confirmation from partition leader); and manually commit at the consumer side.
If in case, your consumer stops receiving traffic suddenly, then you may consider tweaking “Kafka – Spring Boot” consumer connectivity configurations like “fetch.max.wait.ms”, “heartbeat.interval.ms”, “session.timeout.ms” and “max.poll.interval.ms”.
Code Setup and Tips
To set up a simple Maven-based Spring Boot based application, create a new Spring Boot project with the dependencies spring-boot-starter and spring-boot-starter-web. The package name should be “learn.kafka” which will create the main class as “learn.kafka.TestApp”. You should have the application running at port 8080. There are plenty of tutorials and spring initializers if you may need help.
To download Kafka, follow the instructions on https://kafka.apache.org/quickstart to start ZooKeeper and Kafka Server on your local machine and create a topic using the following command.
“bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-1p”
Tip: if required, you can delete a topic using the following command.
xxxxxxxxxx
“./bin/kafka-topics.sh --delete --topic topic-1p --bootstrap-server localhost:9092”
Once you have a basic Spring boot application and Kafka ready to roll, it’s time to add the producer and the consumer to Spring boot application. We will use the same spring boot application as a producer as well as a consumer for this setup. You may choose to have two different spring boot applications as producer and consumer respectively.
To add the Kafka support to “TestApp” application; open the POM.xml and add the following dependency.
xxxxxxxxxx
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Add Lombok as a dependency. We will be leveraging Lombok for our CustomMessage class.
xxxxxxxxxx
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
We have enabled Kafka support for our “TestApp” application. Now, let’s create a custom message class under the package “learn.kafka.model”.
xxxxxxxxxx
package learn.kafka.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
public class CustomMessage {
private String value; // payload
private Date startTime; // when the producer started
}
Being a minimalist, let’s use “TestApp” class to add both producer and consumer in same class. Trust me! It’s not lot of code; thanks to Spring Boot’s magic!
To add the consumer – add the consumer factory and consumer method as below.
xxxxxxxxxx
public ConsumerFactory<String, CustomMessage> consumerFactory(KafkaProperties kafkaProperties){
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
Tips:
- ConsumerFactory bean is required only because of “CustomMessage”. You don’t have to create this bean explicitly if your value is of type string.
- Like many tutorials online DO NOT HARD CODE properties in “ConsumerFactory”. Instead, build it using “kafkaProperties.buildConsumerProperties()” and enhance it based on your needs. This will allow you to control your consumer from application.yml.
Hot Tip: Most of the tutorials online create ConcurrentKafkaListenerContainerFactory bean explicitly. You don’t need to do so; Spring Boot will do it for you as soon as you will specify concurrency property for the consumer.
xxxxxxxxxx
topics = "#{'${spring.kafka.consumer.topic}'}") (
public void singleMessageConsumerWithManualAck(CustomMessage message, Acknowledgment acknowledgment, (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
if(receivedMessagedCount.get() < 1000) log.info("Partition: {} receivedMessagedCount:{}", partition, receivedMessagedCount.get());
consumeMessage(message);
acknowledgment.acknowledge();
}
Tip: You don’t have to specify a topic name here and use the common property; but I decided to keep it separate because in most practice scenarios producer and consumer will be two different applications.
xxxxxxxxxx
private void consumeMessage(CustomMessage message) {
// indicate log to make sure that consumer has started
if(message.getIndex() == 0) log.info("Consumer started...");
// keep the count of received messages
receivedMessagedCount.getAndIncrement();
// calculate the time taken since producer started till now
long diffInMillies = Math.abs(new Date().getTime() - message.getStartTime().getTime());
long diff = TimeUnit.MILLISECONDS.convert(diffInMillies, TimeUnit.MILLISECONDS);
if(receivedMessagedCount.get() == MESSAGE_COUNT) {
log.info("receivedMessagedCount:{} Took: {} ms", receivedMessagedCount.get(), diff);
log.info("Consumer finished.");
}
// consumer has to do something. isn't it?
// Thread.sleep(20);
}
The actual consuming method has been separated out so I can reuse the same logic with a different setting like single vs batch mode.
And finally, add the below configuration in your application.yml.
xxxxxxxxxx
spring:
kafka:
bootstrap-servers: localhost:9092
listener:
ack-mode: manual_immediate
consumer:
groupId: test-group-1
topic: topic-1p
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'learn.kafka.model'
Restart the application and your Consumer should be up and running. Please make sure ZooKeeper and Kafka Server are running on your local machine as mentioned above.
If your application is running, then add the producer in the same class by injecting the topic name, KafkaTemplate, and producerFactory in TestApp.class as below.
xxxxxxxxxx
"${spring.kafka.producer.topic}") private String topic; (
private KafkaTemplate<String, CustomMessage> kafkaTemplate;
public KafkaTemplate<String, CustomMessage> kafkaTemplate(ProducerFactory<String, CustomMessage> producerFactory) {
return new KafkaTemplate(producerFactory);
}
public ProducerFactory<String, CustomMessage> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties());
}
Special note; please add commons-lang3 in your dependencies, we will be using it to generate random strings of char 1000 length.
xxxxxxxxxx
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
and a method to send the messages to Kafka Topic.
xxxxxxxxxx
private void sendMessages(Date startTime, long messageCount) {
log.info("Producer started...");
for (int i = 0; i < messageCount; i++) {
String value = RandomStringUtils.random(1000, true, true);
CustomMessage message = CustomMessage.builder.value(value).startTime(startTime).build();
kafkaTemplate.send(topic, message);
}
log.info("Producer finished.");
}
And add the producer configuration in your application.yml. Your combined consumer and producer properties application.yml should look like this:
xxxxxxxxxx
spring:
kafka:
bootstrap-servers: localhost:9092
listener:
ack-mode: manual_immediate
producer:
topic: topic-1p
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
acks: 1
consumer:
groupId: test-group-1
topic: topic-1p
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'learn.kafka.model'
And add “ApplicationRunner” which will kick off the producer and consumer on application startup.
xxxxxxxxxx
ApplicationRunner runAdditionalClientCacheInitialization() {
return args -> {
final Date startTime = new Date();
sendMessages(startTime, 1000000);
// sendMessagesWithThread(startTime, 1000000, 10);
};
}
Run your application and depending on your hardware, the time taken could be different for you.
You should be able to see how much in the console logs.
“receivedMessagedCount:1000000 Took: 45217 ms”
To add the multiple consumer threads – modify your application properties file as below. Also, point the application to a topic with 10 partitions. Remember! Kafka topic partition must be the same or less than the number of concurrent consumer threads. You can create a new topic with 10 partitions using this command:
xxxxxxxxxx
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic topic-10p
xxxxxxxxxx
spring:
kafka:
bootstrap-servers: localhost:9092
listener:
concurrency: 10
ack-mode: manual_immediate
producer:
topic: topic-10p
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
acks: 1
consumer:
groupId: test-group-1
topic: topic-10p
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'learn.kafka.model'
Run your application and depending on your hardware, the time taken could be different for you.
You should be able to see how much in the console logs.
“receivedMessagedCount:1000000 Took: 69660 ms”
To test the consumer’s batch based configuration, you can add the Kafka listener property to application.yml and add a new consumer method that can accept the list of Custom messages.
xxxxxxxxxx
spring:
kafka:
listener:
type: batch
xxxxxxxxxx
topics = "#{'${spring.kafka.consumer.topic}'}") (
public void batchConsumerWithManualAck(List< CustomMessage > messages, Acknowledgment acknowledgment, (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
// To sample different partitions, batch size, over all message count and size of the message
if(receivedMessagedCount.get() < 1000) log.info("Partition: {} batchSize: {} receivedMessagedCount:{}", partition, messages.size(), receivedMessagedCount.get());
// process the batch
messages.forEach(message -> {
// process the message
consumeMessage(message);
// manually acknowledgment
acknowledgment.acknowledge();
});
}
TIP: Once you add the method, don’t forget to comment out “singleMessageConsumerWithManualAck” method.
Run your application and depending on your hardware, the time taken could be different for you.
You should be able to see how much in the console logs.
“receivedMessagedCount:1000000 Took: 77055 ms”
To test producer concurrency; simply add more threads using Spring boot’s task executer. KafkaTemplate is thread safe and can be used by multiple threads.
xxxxxxxxxx
private void sendMessagesWithThread(Date startTime, long totalMessages, int threads) {
final long messagePerThread = totalMessages /threads;
log.info("messagePerThread:{}", messagePerThread);
// final CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
taskExecutor.execute(new Runnable() {
public void run() {
sendMessages(startTime, messagePerThread);
// latch.countDown();
}
});
}
and update the Application runner to use the method above.
xxxxxxxxxx
ApplicationRunner runAdditionalClientCacheInitialization() {
return args -> {
final Date startTime = new Date();
// sendMessages(startTime, 1000000);
sendMessagesWithThread(startTime, 1000000, 10);
};
}
This is just the tip of the iceberg. I hope this setup will help to tune your Kafka configuration for moderate usages.
Opinions expressed by DZone contributors are their own.
Comments