Modify Kafka Topic Partitions Count and An Example of Partition Reassignment in Strimzi
I started learning how we can increase the number of Kafka topic partitions and also how we reassign partitions of topics to different replicas of the Kafka setup.
Join the DZone community and get the full member experience.
Join For FreeHi,
I started learning how we can increase the number of Kafka topic partitions and also how we reassign partitions of topics to different replicas of the Kafka setup. If you go through my previous articles on Strimzi, you will know how to setup Strimzi. It would be even better if you follow Stimzi documentation and partition-reassignment.
As always I am using minikube on Fedora 34 to create and test Kafka cluster setup with Strimzi.
So let's start the fun!
Modifying Kafka Topic Partitions
1. Create Kafka topic using KafkaTopic
resource.
# Note there is one partition of the topic and one replica.
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl apply -f kafka-topic-2.yaml
kafkatopic.kafka.strimzi.io/test-topic created
$ kubectl logs -f my-cluster-entity-operator-7594496dff-gl889 -c topic-operator
2021-07-03 08:12:55,02874 INFO [vert.x-eventloop-thread-1] TopicOperator:1493 - Reconciliation #1894(periodic kafka __consumer_offsets) KafkaTopic(kafka/consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a): Success reconciling KafkaTopic kafka/consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a
2021-07-03 08:14:52,03119 INFO [OkHttp https://10.96.0.1/...] K8sTopicWatcher:56 - Reconciliation #1906(kube +test-topic) KafkaTopic(kafka/test-topic): event ADDED on resource test-topic generation=1, labels={strimzi.io/cluster=my-cluster}
2021-07-03 08:14:52,05804 INFO [vert.x-eventloop-thread-1] TopicOperator:576 - Reconciliation #1912(kube +test-topic) KafkaTopic(kafka/test-topic): Reconciling topic test-topic, k8sTopic:nonnull, kafkaTopic:null, privateTopic:null
2021-07-03 08:14:52,12949 INFO [ZkClient-EventThread-19-localhost:2181] ZkTopicsWatcher:126 - Topics deleted from ZK for watch 2: []
2021-07-03 08:14:52,13314 INFO [ZkClient-EventThread-19-localhost:2181] ZkTopicsWatcher:142 - Topics created in ZK for watch 2: [test-topic]
2021-07-03 08:14:52,36533 INFO [vert.x-eventloop-thread-1] TopicOperator:743 - Reconciliation #1931(kube +test-topic) KafkaTopic(kafka/test-topic): All three topics are identical
2021-07-03 08:14:52,38932 INFO [kubernetes-ops-pool-18] CrdOperator:113 - Reconciliation #1935(kube +test-topic) KafkaTopic(kafka/test-topic): Status of KafkaTopic test-topic in namespace kafka has been updated
2021-07-03 08:14:52,39159 INFO [vert.x-eventloop-thread-1] K8sTopicWatcher:60 - Reconciliation #1938(kube +test-topic) KafkaTopic(kafka/test-topic): Success processing event ADDED on resource test-topic with labels {strimzi.io/cluster=my-cluster}
2021-07-03 08:14:53,41798 INFO [vert.x-eventloop-thread-1] TopicOperator:576 - Reconciliation #1946(/brokers/topics 2:+test-topic) KafkaTopic(kafka/test-topic): Reconciling topic test-topic, k8sTopic:nonnull, kafkaTopic:nonnull, privateTopic:nonnull
2021-07-03 08:14:53,42050 INFO [vert.x-eventloop-thread-1] TopicOperator:743 - Reconciliation #1951(/brokers/topics 2:+test-topic) KafkaTopic(kafka/test-topic): All three topics are identical
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 0.0.0.0:9092
Topic: test-topic TopicId: r7CTwaQrTauPMboAGF2QtA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=7200000,message.format.version=2.8-IV1
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2. Increase topic partition. Topic-related details are logged in the entity-operator pod.
# Note we just increased partition count by one.
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 2
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl logs -f my-cluster-entity-operator-7594496dff-gl889 -c topic-operator
2021-07-03 08:24:16,29567 INFO [OkHttp https://10.96.0.1/...] K8sTopicWatcher:56 - Reconciliation #2485(kube =test-topic) KafkaTopic(kafka/test-topic): event MODIFIED on resource test-topic generation=2, labels={strimzi.io/cluster=my-cluster}
2021-07-03 08:24:16,35845 INFO [vert.x-eventloop-thread-1] TopicOperator:576 - Reconciliation #2491(kube =test-topic) KafkaTopic(kafka/test-topic): Reconciling topic test-topic, k8sTopic:nonnull, kafkaTopic:nonnull, privateTopic:nonnull
2021-07-03 08:24:16,40524 INFO [ZkClient-EventThread-19-localhost:2181] ZkTopicWatcher:23 - Reconciliation #2501(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Partitions change
2021-07-03 08:24:16,53249 INFO [kubernetes-ops-pool-13] CrdOperator:113 - Reconciliation #2510(kube =test-topic) KafkaTopic(kafka/test-topic): Status of KafkaTopic test-topic in namespace kafka has been updated
2021-07-03 08:24:16,53912 INFO [vert.x-eventloop-thread-0] K8sTopicWatcher:60 - Reconciliation #2513(kube =test-topic) KafkaTopic(kafka/test-topic): Success processing event MODIFIED on resource test-topic with labels {strimzi.io/cluster=my-cluster}
2021-07-03 08:24:16,57426 INFO [vert.x-eventloop-thread-1] TopicOperator:902 - Reconciliation #2520(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Topic test-topic partitions changed to 1
2021-07-03 08:24:16,65636 INFO [vert.x-eventloop-thread-1] TopicOperator:576 - Reconciliation #2521(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Reconciling topic test-topic, k8sTopic:nonnull, kafkaTopic:nonnull, privateTopic:nonnull
2021-07-03 08:24:16,65686 ERROR [vert.x-eventloop-thread-1] TopicOperator:750 - Reconciliation #2526(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Number of partitions cannot be decreased
2021-07-03 08:24:16,67914 INFO [kubernetes-ops-pool-15] CrdOperator:113 - Reconciliation #2533(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Status of KafkaTopic test-topic in namespace kafka has been updated
2021-07-03 08:24:16,69024 INFO [vert.x-eventloop-thread-1] TopicOperator:131 - Reconciliation #2535(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Number of partitions cannot be decreased
2021-07-03 08:24:16,70069 INFO [vert.x-eventloop-thread-1] ZkTopicWatcher:26 - Reconciliation #2537(/brokers/topics =test-topic) KafkaTopic(kafka/test-topic): Reconciliation result due to topic partitions change on topic test-topic: Future{cause=Number of partitions cannot be decreased}
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 0.0.0.0:9092
Topic: test-topic TopicId: r7CTwaQrTauPMboAGF2QtA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=7200000,message.format.version=2.8-IV1
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
3. We cannot decrease partition count. Let's try:
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl logs -f my-cluster-entity-operator-7594496dff-gl889 -c topic-operator
2021-07-03 15:37:08,49624 INFO [kubernetes-ops-pool-14] CrdOperator:113 - Reconciliation #16050(kube =test-topic) KafkaTopic(kafka/test-topic): Status of KafkaTopic test-topic in namespace kafka has been updated
2021-07-03 15:37:08,50325 ERROR [vert.x-eventloop-thread-1] K8sTopicWatcher:69 - Reconciliation #16054(kube =test-topic) KafkaTopic(kafka/test-topic): Failure processing KafkaTopic watch event MODIFIED on resource test-topic with labels {strimzi.io/cluster=my-cluster}: Number of partitions cannot be decreased
io.strimzi.operator.topic.PartitionDecreaseException: Number of partitions cannot be decreased
at io.strimzi.operator.topic.TopicOperator.update3Way(TopicOperator.java:753) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.strimzi.operator.topic.TopicOperator.reconcile(TopicOperator.java:640) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.strimzi.operator.topic.TopicOperator.lambda$reconcileOnResourceChange$32(TopicOperator.java:1162) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:38) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
at io.vertx.core.impl.future.CompositeFutureImpl.trySucceed(CompositeFutureImpl.java:163) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
at io.vertx.core.impl.future.CompositeFutureImpl.lambda$all$0(CompositeFutureImpl.java:38) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
-----
2021-07-03 15:37:08,50918 WARN [vert.x-eventloop-thread-1] TopicOperator:134 - Reconciliation #16058(kube =test-topic) KafkaTopic(kafka/test-topic): Failure processing KafkaTopic watch event MODIFIED on resource test-topic with labels {strimzi.io/cluster=my-cluster}: Number of partitions cannot be decreased
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 0.0.0.0:9092
Topic: test-topic TopicId: r7CTwaQrTauPMboAGF2QtA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=7200000,message.format.version=2.8-IV1
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
$ kubectl describe kt test-topic |grep -A 7 Status
Status:
Conditions:
Last Transition Time: 2021-07-03T15:39:03.114076Z
Message: Number of partitions cannot be decreased
Reason: PartitionDecreaseException
Status: True
Type: NotReady
Observed Generation: 5
Topic Name: test-topic
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning <unknown> io.strimzi.operator.topic.TopicOperator Failure processing KafkaTopic watch event MODIFIED on resource test-topic with labels {strimzi.io/cluster=my-cluster}: Number of partitions cannot be decreased
4. This leaves the topic in an inconsistent state, we can find it if we describe the topic as in step 3.
So let's set partition count to 2 again.
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 2
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl apply -f kafka-topic-2.yaml
kafkatopic.kafka.strimzi.io/test-topic configured
# topic is in ready state.
$ kubectl describe kt test-topic |grep -A 7 Status
Status:
Conditions:
Last Transition Time: 2021-07-03T15:46:06.563274Z
Status: True
Type: Ready
Observed Generation: 6
Topic Name: test-topic
5. Can we increase replicas similarly? No, we can't. Let's try increasing the replicas count for the topic.
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 2
replicas: 2
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl apply -f kafka-topic-2.yaml
kafkatopic.kafka.strimzi.io/test-topic configured
$ kubectl logs -f my-cluster-entity-operator-7594496dff-gl889 -c topic-operator
2021-07-03 15:58:12,37363 INFO [kubernetes-ops-pool-19] CrdOperator:113 - Reconciliation #17284(kube =test-topic) KafkaTopic(kafka/test-topic): Status of KafkaTopic test-topic in namespace kafka has been updated
2021-07-03 15:58:12,38067 ERROR [vert.x-eventloop-thread-1] K8sTopicWatcher:69 - Reconciliation #17287(kube =test-topic) KafkaTopic(kafka/test-topic): Failure processing KafkaTopic watch event MODIFIED on resource test-topic with labels {strimzi.io/cluster=my-cluster}: Changing 'spec.replicas' is not supported. This KafkaTopic's 'spec.replicas' should be reverted to 1 and then the replication should be changed directly in Kafka.
io.strimzi.operator.topic.ReplicationFactorChangeException: Changing 'spec.replicas' is not supported. This KafkaTopic's 'spec.replicas' should be reverted to 1 and then the replication should be changed directly in Kafka.
at io.strimzi.operator.topic.TopicOperator.update3Way(TopicOperator.java:759) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.strimzi.operator.topic.TopicOperator.reconcile(TopicOperator.java:640) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.strimzi.operator.topic.TopicOperator.lambda$reconcileOnResourceChange$32(TopicOperator.java:1162) ~[io.strimzi.topic-operator-0.24.0.jar:0.24.0]
at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:38) ~[io.vertx.vertx-core-4.1.0.jar:4.1.0]
$ kubectl describe kt test-topic |grep -A 3 Status
Status:
Conditions:
Last Transition Time: 2021-07-03T15:58:12.358974Z
Message: Changing 'spec.replicas' is not supported. This KafkaTopic's 'spec.replicas' should be reverted to 1 and then the replication should be changed directly in Kafka.
--
Status: True
Type: NotReady
Observed Generation: 7
Topic Name: test-topic
# Revert changes so that topic is in ready state.
$ cat kafka-topic-2.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 2
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
$ kubectl apply -f kafka-topic-2.yaml
kafkatopic.kafka.strimzi.io/test-topic configured
$ kubectl describe kt test-topic |grep -A 3 Status
Status:
Conditions:
Last Transition Time: 2021-07-03T16:04:22.379061Z
Status: True
Type: Ready
Observed Generation: 8
Topic Name: test-topic
6. Replicas can be assigned (increased or decreased) with help of Kafka's kafka-reassign-partitions.sh script.
There might be a requirement where we want to reassign partitions to balance storage loads or if we want to add or remove volumes. Script kafka-reassign-partitions.sh we would have to execute in three modes: --generate
, --execute
, --verify
.
Note: To avoid data loss while removing volumes, we would have to move all partitions.
$ cat topics.json
{ "topics": [ { "topic" : "test-topic"}], "version":1}
$ cat topics.json | kubectl exec -c kafka my-cluster-kafka-0 -i -- \
/bin/bash -c \
'cat > /tmp/topics.json'
$ kubectl exec -c kafka my-cluster-kafka-0 -it -- /bin/bash
$ more /tmp/topics.json
{ "topics": [ { "topic" : "test-topic"}], "version":1}
$ exit
$ kubectl exec my-cluster-kafka-0 -c kafka -it -- bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file /tmp/topics.json --broker-list 0,2 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[1],"log_dirs":["any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]}]}
$ cat reassignment.json
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]}]}
$ cat rollback.json
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[1],"log_dirs":["any"]}]}
$ kubectl exec my-cluster-kafka-0 -c kafka -it -- bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for test-topic-0,test-topic-1
$ kubectl exec my-cluster-kafka-0 -c kafka -it -- bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassignment.json --verify
Status of partition reassignment:
Reassignment of partition test-topic-0 is complete.
Reassignment of partition test-topic-1 is complete.
Clearing broker-level throttles on brokers 0,1,2
Clearing topic-level throttles on topic test-topic
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 0.0.0.0:9092
Topic: test-topic TopicId: r7CTwaQrTauPMboAGF2QtA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=7200000,message.format.version=2.8-IV1
Topic: test-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
7. If we want to rollback to the previous replica state then we can just save the json generated during --execute
mode. We can find it in Step 6.
$ cat rollback.json
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[1],"log_dirs":["any"]}]}
$ cat rollback.json | kubectl exec -c kafka my-cluster-kafka-0 -i -- /bin/bash -c 'cat > /tmp/rollback.json'
$ kubectl exec my-cluster-kafka-0 -c kafka -it -- bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/rollback.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for test-topic-0,test-topic-1
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 0.0.0.0:9092
Topic: test-topic TopicId: r7CTwaQrTauPMboAGF2QtA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=7200000,message.format.version=2.8-IV1
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Conclusion
That's it guys, I hope you will also find this article helpful. If you like it or not just leave a comment so that I can write better next time.
Opinions expressed by DZone contributors are their own.
Comments