Kafka Consumer Delivery Semantics
Everything you need to know about consumer delivery semantics.
Join the DZone community and get the full member experience.
Join For FreeThis article is a continuation of part 1 Kafka technical overview, part 2 Kafka producer overview, part 3 Kafka producer delivery semantics and part 4 Kafka consumer overview. Let's understand different consumer configurations and consumer delivery semantics.
Subscribe
To read records from Kafka topic, create an instance of Kafka consumer and subscribe to one or more of Kafka topics. You can subscribe to a list of topics using regular expressions, for example, myTopic.*
.
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
consumer.subscribe("myTopic.*");
Poll Method
Consumers read data from Kafka by polling for new data. The poll method takes care of all coordination like partition rebalancing, heartbeat, and data fetching. When auto-commit is set to true poll method not only reads data but also commits the offsets and then reads the next batch of record as well.
You may also like: A Kafka Tutorial for Everyone, no Matter Your Stage in Development.
Consumer Configurations
Kafka consumer behavior is configurable through the following properties. These properties are passed as key-value pairs when consumer instance is created.
Enable.auto.commit
This defines how offsets are committed to Kafka — by default enable.auto.commit
is set to true. When this property is set to true, you may also want to set how frequent offsets should be committed using auto.commit.interval.ms
.
By default, auto.commit.interval.ms
is set to 5,000ms (5 seconds). When enable.auto.commit
is set to true, consumer delivery semantics is "At most once," and commits are async. Key points:
- Enable.auto.commit = true (default).
- auto.commit.interval.ms = 5,000ms (default).
- At most once delivery semantic.
- Commits are async when
enable.auto.commit
is true.
Partition.assignment.stratergy
In the previous article Kafka consumer overview, we learned that consumers in a consumer group are assigned different partitions. The partitions are assigned to consumers based on partition.assignment.strategy
property. PartitionAssignor
is a class that defines the required interface for the assignment strategy. Kafka comes inbuilt with RangeAssignor
and RoundRobinAssignor
, supporting Range and Round Robin strategy respectively.
Range strategy: In range strategy, partitions are assigned in ranges to consumers. For example, if there are seven partitions in two topics each, consumed by two consumers, then range strategy assigns the first four partitions (0 – 3) to the first consumer from both topics and three partitions (4 – 6) from both topics to the second consumer. The partitions are unevenly assigned, with first consumer processing 8 partitions and second consumer processing only 6 partitions. By default partition.assignment.streatergy
is set to RangeAssignor
.
Round-robin strategy: In round-robin strategy partitions are assigned to the consumer in a round-robin fashion resulting in even distribution of partitions to the consumer. For example, if there are seven partitions in two topics each consumed by two consumers, then round-robin strategy assigns four partitions (0, 2, 4, 6) of first topic and three partitions (1,3,5) of the second topic to first consumer and three partitions (1,3,5) of first topic and four partitions (0, 2, 4, 6) of the second topic to the second consumer. Key points:
- partition.assignment.strategy –decides how partitions are assigned to consumers.
- Range strategy (RangerAssignor) is the default.
- Range strategy may result in an uneven assignment.
Fetch.min.bytes
Defines a minimum number of bytes required to send data from Kafka to the consumer. When Consumer polls for data, if the minimum number of bytes is not reached, then Kafka waits until the pre-defined size is reached and then sends the data.
The default value is set to 1MB. By increasing the fetch.min.bytes
load on both consumer and broker are reduced increasing both latency and throughput. When the messages are too many and small resulting in higher CPU consumption, it’s better to increase fetch.min.bytes
value.
Fetch.max.wait.ms
Defines max time to wait before sending data from Kafka to the consumer. When fetch.min.bytes
control minimum bytes required, sometime minimum bytes may not be reached even for a long time and to keep a balance on how long Kafka should wait before sending data fetch.max.wait.ms
is used. Default value of fetch.max.wait.ms
is 500ms (.5 seconds). Increasing this value will increase latency and throughput of the application, define both fetch.min.bytes
and fetch.max.wait.ms
based on SLA.
Session.timeout.ms
Defines how long a consumer can be out of contact with the broker. While heartbeat.interval.ms
defines how often poll method should send a heartbeat, session.timeout.ms
defines how long consumers can be out of contact with the broker. When session times out consumer is considered lost and rebalance is triggered.
To avoid this from happening often it’s better to set heartbeat.interval.ms
value three times higher than session.timeout.ms
. By setting a higher value you can avoid unwanted rebalancing and other overheads associated with it.
Max.partitions.fetch.bytes
Defines max bytes per partitions to be sent from broker to consumer. By default value is set to 1 MB. Max.message.size
and max.partitions.getch.bytes
will decide the memory required per consumer to receive the message.
Max.pool.records
Defines the number of records to be returned for a single poll()
call. Helps control number of records to be processed per poll method call.
Auto.offset.reset
When reading from the broker for the first time, as Kafka may not have any committed offset value, this property defines where to start reading from. You could set “earliest” or “latest”, while “earliest” will read all messages from the beginning “latest” will read only new messages after a consumer has subscribed to the topic. The default value of auto.offset.reset
is “latest.”
Delivery semantics
As stated in earlier article Kafka producer delivery semantics there are three delivery semantics namely At most once, At least once and Exactly once. When data is consumed from Kafka by Consumer group/consumer, only "At least once" and "At most once" semantics are supported. You could still achieve output similar to exactly once by choosing a suitable data store that writes by a unique key. For example, any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.
At Most Once
In at most once delivery semantics a message should be delivered maximum only once. It's acceptable to lose a message rather than delivering a message twice in this semantic. Applications adopting at most semantics can easily achieve higher throughput and low latency. By default, Kafka consumers are set to use “At most once” delivery semantics as “enable.auto.commit” is true.
In case consumer fails after messages are committed as read but before processing them, the unprocessed messages are lost and will not be read again. Partition rebalancing will result in another consumer reading messages from last committed offset. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be unprocessed but still committed as processed.
At Least Once
In at least once delivery semantics it is acceptable to deliver a message more than once but no message should be lost. The consumer ensures that all messages are read and processed for sure even though it may result in message duplication. This is mostly preferred semantics out of all. Applications adopting at least once semantics may have moderate throughput and moderate latency. By setting “enable.auto.commit” value to “false” you can manually commit after the messages are processed.
In case consumer fails before processing them, the unprocessed messages are not lost as the offsets are not committed as read. Partition rebalancing will result in another consumer reading the same messages again from last committed offset resulting in duplicate messages. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be processed again but no messages will be lost.
Exactly once
In exactly-once delivery semantics, a message must be delivered only once and no message should be lost. This is the most difficult delivery semantic of all. Applications adopting exactly once semantics may have lower throughput and higher latency compared other 2 semantics. As stated earlier you could still achieve output similar to exactly once by choosing suitable data store that writes by a unique key. For example any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.
Summary
Configure Kafka consumer to achieve desired performance and delivery semantics based on the following properties.
- Enable.auto.commit.
- Partition.assignment.strategy.
- Fetch.max.wait.ms.
- Fetch.min.bytes.
- Session.timeout.ms.
- Max.partitions.fetch.bytes.
- Max.pool.records.
- Auto.offset.reset.
Kafka consumer supports only At most once and At least once delivery semantics.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments