How To Fix Kafka Consumers Due To Corrupted Log Files or Missing Log Files
This article explores recovering Kafka Brokers from corrupted data storage and recovering Kafka consumers due to missing log files.
Join the DZone community and get the full member experience.
Join For FreeRed Hat AMQ streams is a massively scalable, distributed, and high-performance data streaming platform based on the Apache Kafka project. It offers a distributed backbone that allows microservices and other applications to share data with high throughput and low latency.
Red Hat AMQ streams deployed on open shift with persistence storage. Spring Kafka clients deployed as producers and consumers.
The Kafka consumer part of the consumer group suddenly stopped consuming messages.
Kafka Broker Logs
Kafka broker-0
ERROR [ReplicaManager broker=0] Error processing append operation on partition __consumer_offsets-13 (kafka.server.ReplicaManager) [data-plane-kafka-request-handler-2]
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-13
INFO [GroupCoordinator 0]: Preparing to rebalance group group-id in state PreparingRebalance with old generation 118964 (__consumer_offsets-13) (reason: Error COORDINATOR_NOT_AVAILABLE when storing group assignment during SyncGroup (member: consumer-group-id-x-aaa-bbb-ccc)) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-2]
Kafka broker-2
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
This happens when some partitions from the _consumer_offsets
have less than two min in-sync replicas, in this case, consumers can not commit offsets. This is what we observed on the above broker-0 logs. One can describe the topic to check min in-sync replicas. For example:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic __consumer_offsets-13
Topic: __consumer_offsets Partition: 13 Leader: 0 Replicas: 0,1,2 Isr: 0 <--------- problematic one
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
Topic: __consumer_offsets Partition: 15 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0
- The logs from broker-2 logs indicate why broker-2 is not able to get in sync for that partition and is not able to fetch offset xxxxx from
__consumer_offset_13
on broker-0. - One should inspect/check what happened to that record using
kafka-dump-log.sh
tool to print that record. For example:
bin/kafka-dump-log.sh --files data/kafka/__consumer_offsets-13/000000000000000xxx.log --value-decoder-class "kafka.serializer.StringDecoder" --offsets-decoder
----------------------------------------------------------------------------
---------------------------------------------------------------------------
-------------------------------------------------------------------------------
baseOffset: xxxxx lastOffset: xxxxx count: 0 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 24 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: yyyyyyy CreateTime: 111111111111 size: 137 magic: 2 compresscodec: none crc: 2xxxxxxx isvalid: false
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /home/kkakarla/Downloads/__consumer_offsets-13/0000000000000xxx.log.
- Looking at the above, it is clearly evident that the record is corrupted. Upon checking the entire
__consumer_offset_13
from broker-0. we found some .log files missing. - One can copy the
__consumer_offset_13
from Kafka broker-0 pod to local using the below command.
oc rsync my-cluster-kafka-0:/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-13 tmp/log/__consumer_offsets-13
- From the logs, we observed the below error messages due to missing .log files from
__consumer_offsets_13
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk. ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition SQL_xxx-10 at offset 12345 (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0] org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk.
- We observed the below read-only error on Kafka storage mount volumes.
message: 'relabel failed /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount:
lsetxattr /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount/kafka-log0:
read-only file system'
- It is possible that the read-only issue prevented the creation of files that should have been created. As a result, the file may have been seemingly lost and corrupted.
Now the question is how one can fix the corrupted __consumer_offset_13
from Kafka broker-0.
If other Kafka brokers Kafka-broker-1 and Kafka-broker-2 have log files in good condition. There is a way to restore __consumer-offset-13
using logs of Kafka-broker-1 or Kafka-broker-2
- Stopping all the spring consumer applications
- Configuring
unclean.leader.election.enable=true
in the Kafka CR yaml (oc edit Kafka -o yaml) unclean.leader.election.enable=true
is the configuration to fix__consumer-offset-13
in Kafka-0 using the Kafka-1 and Kafka-2 log files.- unclean.leader.election.enable (default: false)
- Indicates whether to enable replicas not in the ISR set to be elected as a leader as a last resort, even though doing so may result in data loss
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
...
spec:
kafka:
...
config:
unclean.leader.election.enable: true
- Wait for about three minutes, checking whether
unclean.leader.election.enable
is set to true for each of the Kafka brokers, using the following command - Please adjust the
{BROKER_ID}
for each broker id(0, 1, 2) and execute repeatedly.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name ${BROKER_ID} --describe --all | grep unclean.leader.election.enable
- It is possible that the Strimzi cluster operator does not set
unclean.leader.election.enable=true
due to a lack of ISRs - If
unclean.leader.election.enable=tru
e could not be set by the strimzi cluster operator], we should set it manually. Check again after executing the following commands. please adjust the{BROKER_ID}
for each broker id(0, 1, 2) and execute repeatedly.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name ${BROKER_ID} --alter --add-config unclean.leader.election.enable=true
- Just to be sure, checking that the kafka-0 pod can be restarted using "
oc delete pod <kafka-0 pod name>
" - If the broker does not start here related to other than
__consumer-offset-13
, there may be other problems. - Logging in the kafka-0 pod, using "
oc debug <kafka-0 pod name>
" - After logging in the kafka-0 pod, deleting the
__consumer-offset-13(/var/lib/kafka/data-<JBOD_ID>/kafka-log<BROKER_ID>/__consumer_offsets-13)
directory in the kafka-0 pod. - Logging off from the kafka-0 pod
- Restarting kafka-0 pod, using "
oc delete pod <kafka-0 pod name>
" - Wait for about 3 minutes
- If everything goes smooth then
__consumer-offset-13 of Kafka-0
may recover from kafka-1 and kafka-2 logs automatically at this point.
If the above solution fails for some reason, then one can try the below workaround:
- The safe workaround to avoid the use of
__consumer-offsets-13
by changing the consumer group name(group.xxx) of the client spring application. - The partition number of
__consumer_offsets
is determined from the hash value of the consumer group name(group.xxx) and the max number of__consumer_offsets
partition. - For your information, the following is a small Java program to identify the partition number of the
__consumer_offsets
. If you usegroup-xxx
it will be 13. - The partition number of your new consumer group name should not be 13. For example, if you use
group-zzz
as the new consumer group name, the partition number should be some other integer no. - One can extract a list of offsets of the
consumer group(group.id=group-xxx)
from the log files of Kafka-broker 1 and Kafka-broker 2
This is the Java program. to find the partition number of the __consumer_offsets
public static void main(String[] args) {
int maxConsumerOffsetsTopicPartition = 50; // __consumer_offsets topic partition num: default 50
String groupId = "group-xxx"; // corrupted consumer group
int consumerOffsetsPartitionNum = Math.abs(groupId.hashCode()) % maxConsumerOffsetsTopicPartition;
System.out.println(consumerOffsetsPartitionNum);
}
If one would like to reset the offset for that new consumer group, please use the following commands.
# reset to earliest
oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME} --execute
# reset to latest
oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-latest --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME} --execute
# reset to specific offset
oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-offset <OFFSET_VALUE> --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME}:<PARTITION_NUMBERS> --execute
After changing the consumer group name in the above workaround, one should delete the __consumer-offsets-13
log files manually for cleanup.
Follow the steps mentioned in the section "How one can fix the corrupted __consumer_offset_13
from Kafka broker-0."
Opinions expressed by DZone contributors are their own.
Comments