Kafka-Streams - Tips on How to Decrease Re-Balancing Impact for Real-Time Event Processing On Highly Loaded Topics
Need to reduce re-balancing time on Kafka consumer group during deployment and understand pitfalls of Kafka Streams? Read this article to learn about the factors that affect re-balance latency.
Join the DZone community and get the full member experience.
Join For FreeOverview
Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing. During rebalance, consumers stop processing messages for some period of time, and, as a result, processing of events from a topic happens with some delay. Some business cases could tolerate rebalancing, meanwhile, others require real-time event processing and it's painful to have delays in more than a few seconds. Here we will try to figure out how to decrease rebalance for Kafka-Streams clients (even though some tips will be useful for other Kafka consumer clients as well).
Let's look at one existing use case. We have a micro-service with 45 application instances, that is deployed into Docker Kubernetes with configured up-scaling and down-scaling (based on CPU load). This service consumes a single topic using Kafka-Streams (actually there are more consuming topics, but let's concentrate on a single one), a topic with 180 partitions and traffic is 20000 messages per second. We use Kafka Streams configuration property, num.stream.threads = 4
so a single app instance processes 4 partitions in 4 threads (45 instances with 4 threads per each, so actually it means each partition out of 180 is processed by its own thread). As a result, a consumer should handle around 110 messages per second from a single partition. In our case, processing of a single message takes around 5 milliseconds and the stream is stateless (processing - both CPU and IO intensive, some invocations into databases and REST calls to other micro-services).
During deployment rollout, we had delays on consuming events more than 1 minute by 99th percentile, and definitely, it impacts business flow, as we need real-time processing. After understanding what is going on and tuning configuration, we significantly decreased it, and now latency of processing takes up to 2-3 seconds by 99th percentile (yes, we have metrics for that, and we will talk about that closer to the end of this article). So just understanding and changing configuration, rebalance latency was decreased more than ten times, without any code changes. As of writing this article, we use kafka-streams
maven dependency with version 2.8.0
.
Tips on How to Decrease Re-Balancing Impact
The following tips will allow you to significantly decrease rebalancing latency for your application.
1. Always keep the latest version of Kafka-Streams
Developers of Kafka-Streams and Kafka-Clients regularly improve rebalancing protocol, and performance becomes better and better over time (thanks to the amazing Confluent team) with new smarter and optimized logic. It could be seen by reviewing release notes on each new version. A feature improvement that is worth highlighting is the Incremental cooperative rebalancing protocol. With the introduction of incremental cooperative rebalancing, streams no longer require all tasks to be revoked at the beginning of a rebalance (stop the world effect). Instead, at the completion of the rebalance, only those tasks which are to be migrated to another consumer for overall load balance will need to be closed and revoked. It significantly improves rebalance latency. Now the number of rebalances is much higher, but with a much shorter latency. And Kafka-Streams has this feature out of the box (since version 2.4.0
, and with some improvements at 2.6.0
), with default partition assignor StreamsPartitionAssignor
.
2. Add Kafka-Streams Configuration Property
Add Kafka-Streams configuration property internal.leave.group.on.close = true
for sending consumer leave group requests on app shutdown. By default, Kafka-Streams doesn't send consumer leave group requests on app graceful shutdown, and, as a result, messages from some partitions (that were assigned to terminating app instance) will not be processed until the session by this consumer will expire (with duration session.timeout.ms
), and only after expiration, new rebalance will be triggered. By default, session.timeout.ms = 10000
so it means during a single app instance restart, messages by some partitions will be processed at least within 10 seconds, and it's painful for real-time requirements. The motivation for such default behavior is to reduce the number of rebalances (rebalance only on app start, and not on app shutdown), and especially relevant for heavy-weight stateful streams.
If your micro-service restarts super fast (less than 10 seconds), then in conjunction with the static group membership feature, you really might benefit from such default behavior of not sending leave group requests. But in a majority of cases (and in our as well), app instance restarts longer than default session timeout, and still, we have two rebalances (on session timeout and on new instance start), so not sending leave group request even with static group membership is not an option for real-time processing, as requirements don't want to tolerate for 10 seconds of delay or even more.
In order to change such default behavior, we should use the internal Kafka Streams config property internal.leave.group.on.close = true
. This property should be added during Kafka Streams creation new KafkaStreams(streamTopology, properties)
(should be put in the second constructor argument, Properties
). As the property is private, here we need to be careful and double-check before upgrading to a new version of the config is still there (logic is inside GroupRebalanceConfig and StreamsConfig classes). This property hasn't changed to the public by the following discussion.
In our case, adding this property significantly improves the situation during deployments for stateless streams. Also, keep in mind that leave group requests still might not be sent in some rare cases on non-graceful instance shutdown (e.g. instance crashed unexpectedly), and in that case, you need to tune session.timeout.ms
, that is described in the next tip.
3. Decrease Consumer Session Expiration
Decrease consumer session expiration by updating configuration property session.timeout.ms
By default, Kafka Streams has session expiration as 10 seconds (session.timeout.ms = 10000
) and heartbeats to the consumer coordinator as 3 seconds (heartbeat.interval.ms = 3000
). If app instance shutdowns without sending leave group request (e.g. on non-graceful shutdown, or property internal.leave.group.on.close = false
) or just unavailable (e.g. due to a network issue), rebalance will happen only after 10 seconds, and we will have a delay in event processing.
We could try to decrease both properties session.timeout.ms
and heartbeat.interval.ms
in order to fire rebalance earlier. A general recommendation is that session.timeout.ms
should be not less than three times of heartbeat.interval.ms
. So, as an example, we could set session.timeout.ms = 6000
and heartbeat.interval.ms = 1500
. Be careful with these settings, as it increases the probability of rebalancing occurrence on a daily basis, and consumers might hang in long rebalances, depending on network quality and stability (e.g. if we have multiple app instances, and some of them too slowly send sync and join group requests). So you need to test what is the reasonable session timeout value for your infrastructure.
4. Decrease the Number of Simultaneously Restarted App Instances During Deployment Rollout
Using Kubernetes, we could control how many app instances are created with a new deployment at the same time. It's achievable by using properties max surge and max unavailable. Both configs have a default value of 25%, values can be an absolute number (for example, 1 pod) or a percentage of desired pods. If we have tens of app instances, such default configuration will rollout multiple new instances and at the same time, multiple instances will be terminating. It means that multiple partitions will require reassignment to other app instances, and multiple rebalances will be fired, and it will lead to significant rebalance latency. The most preferable configuration for decreasing rebalance duration is changing those configurations to max surge = 1
and max unavailable = 0
. In that case, only a single app instance will be started at a specific time, and up to a few instances will be terminating, and it will lead to a smaller number of partitions that require reassignment. Having, the deployment process will be longer, but more stable, if our top priority is minimal delays in event processing.
The ideal case from a rebalancing latency point of view is when we have N partitions and N app instances (each instance consumes only a single partition). If we deploy a new app version with, start, or shut down a new instance will require only a single partition to be revoked and reassigned. In reality, we don't need to have so many app instances, and we could have N
app instances and let's say 4 * N
partitions together with Kafka Streams configuration property num.stream.threads = 4
(so each partition will be processed by a separate thread). In that case, rolling out of a new instance will require 4 partitions to be reassigned to another consumer. Play around with the number of app instances and deployment configuration, and see what is more suitable for you.
5. Increase the Number of Topic Partitions and App Instances With a Slight Excess
This tip somehow correlates with the previous one. Having a higher number of partitions will lead to decreased throughput per single partition. Also, having a higher number of app instances, restart of a single one will lead to smaller Kafka lag during rebalancing.
Also, make sure that you don't have frequent up-scaling and down-scaling of app instances (as it triggers rebalances). If you have a few up-scaling and down-scaling per hour, seems it's not a good configuration for a minimal number of instances, so you need to increase it.
6. Split Highly Loaded Topic Into Few
For some use cases, it might be reasonable to split a single highly loaded topic into multiple. In that case, we will have a smaller throughput per topic, and rebalance will lead to a smaller Kafka lag on each topic. For other use cases, it's not an option (e.g. if we need to process events at specified order per partition). Still, it might be more preferable just to increase the number of partitions on a highly loaded topic, and it will be enough for you.
7. Utilize Effective Monitoring and Alerts
It's the final tip, that will not decrease rebalance latency, but will allow you to monitor your system, and will give you the ability to understand what is going on. Even though you could get all available metrics by invoking metrics()
method on KafkaStreams
object, it would be more convenient to produce metrics into Prometheus
using micrometer
library and looking at graphs in Grafana
. For integrating Kafka-Streams with micrometer
, you need to have KafkaStreamsMicrometerListener bean:
@Bean
KafkaStreamsMicrometerListener kafkaStreamsMicrometerListener(MeterRegistry meterRegistry) {
return new KafkaStreamsMicrometerListener(meterRegistry);
}
where MeterRegistry
is from micrometer-core
dependency.
If you create Kafka Streams using StreamsBuilderFactoryBean
from spring-kafka
, then you need to add a listener into it: streamsBuilderFactoryBean.addListener(kafkaStreamsMicrometerListener);
. And if you create KafkaStreams
objects directly, then on each KafkaStreams
the object you need to invoke kafkaStreamsMicrometerListener.streamsAdded(beanId, kafkaStreams);
, where beanId
is any unique identifier per KafkaStreams
object.
With configured listener KafkaStreamsMicrometerListener
, Kafka Streams provides multiple useful Prometheus metrics related to rebalancing, and the most interesting of them are:
- kafka_consumer_coordinator_rebalance_rate_per_hour
- kafka_consumer_coordinator_failed_rebalance_rate_per_hour
- kafka_consumer_coordinator_rebalance_latency_avg
- kafka_consumer_coordinator_rebalance_latency_max
- kafka_consumer_coordinator_partition_revoked_latency_avg
- kafka_consumer_coordinator_partition_assigned_latency_avg
These metrics should be added to Prometheus as Gauge metric type.
In addition, to rebalance latency metrics, make sure that you monitor Kafka lag per each consumer group. Having monitoring in place, you could play around with a variety of configurations and understand what works better for your use case. Also nice to have automatic alerts on Grafana graphs, that will notify you (e.g. via email) if some metrics are higher than the max acceptable threshold.
Conclusion
In this article, we've reviewed ways how we could significantly improve rebalance latency on Kafka Streams by tuning configuration. We need to do experiments by provided tips for specific use cases (as it depends on a variety of circumstances, like acceptable processing delay, whether stream stateless or stateful, etc.). Having the latest version of Kafka-Streams in your application gives you improved performance and fixes a variety of bugs from previous versions out of the box. By changing default Kafka-Streams properties and deployment configuration, it might decrease your rebalance latency by more than ten times.
Opinions expressed by DZone contributors are their own.
Comments