Kafka Consumer Pooling
In this article, take a look at Kafka consumer pooling and see a use case.
Join the DZone community and get the full member experience.
Join For FreeConsumer pooling in Kafka? Do we ever need it? This would probably be the first question that comes to mind. After all, the standard Kafka consumption pattern would be to have consumers assign to a known consumer group, and subscribe to topic/s. And let the broker balance the topic partitions across the group.
Thus, inherently consumer sessions are long-running and sticky (I am loosely using the italicized terms) in nature. And by pooling, we generally think of a repetitive usage of a resource for short-term executions (JDBC connection pooling for example).
Use Case
So let’s discuss a typical use case where a consumer pooling was in fact required for a data solution platform PoC I was working on.
We wanted to design a bi-directional conversational communication channel, leveraging a request-reply pattern on Kafka topic — a 2 phase commit protocol where a manager component would orchestrate a consensus amongst worker components. While there could be other solutions (Zookeeper etc) for such a distributed coordination, we chose Kafka as it was already part of the solution stack.
Request-Reply
A simple approach for a request-reply pattern would be: publish to a request topic with a correlationId as the key (or header) — and listen on a reply topic, mapping responses messages by correlationId. Return response from this in-memory map in a wait-notify manner. A standard Kafka consumer listening on the reply topic would suffice here. However, there are 2 things to be noted.
- It needs to be ensured by design, that the reply message is appended to a partition that is assigned to this consumer
- Managing the memory required by the map object and providing a thread safe access to it
The approach what we followed was to avoid using a map for storing results.
- Use a single topic for both request and reply
- When a write is submitted (with correlationId as key), get the publish partition and offset
- Get a consumer assigning with the above write partition and offset
- Poll messages for same correlationId as key
Since the request and reply have same correlationId, we ensure that the messages will be on the same partition. Since reply will always happen after the request publishing, we can safely make the consumer seek to the write offset.
However, this takes us to a different problem — now, we would need to have a forward seeking consumer for every request-reply cycle. And our 2PC protocol will actually have more than 1 request-reply cycles per invocation and there will be concurrent invocations.
That will require a consumer for every 2PC session, and we would not want to create a new Kafka consumer on each request, of course. Hence, use a reusable pool of Kafka consumers.
Apache Commons Pool2
The pooling is developed on top of apache-commons-pool framework. Hence some background on commons-pool framework is a prerequisite for the following sections.
I am not getting into the detail of how to use commons-pool, since there are plenty of articles available on that, as well as to maintain the conciseness of this article.
This implementation discussion mostly focuses on the Kafka consumer aspects that we need to keep in mind, in order to create a pool.
Implementation Approach
We would only allow assign() invocation and disallow subscribe() on the consumer, so that the pool can have control on the random topic/partition/offset assigned.
This can be achieved by proxying the KafkaConsumer
instance, and bypassing the necessary methods
class ConsumerProxy<K,V> implements InvocationHandler {
private final KafkaConsumer<K, V> instance;
public ConsumerProxy(Map<String, Object> configs) {
instance = new KafkaConsumer<>(configs);
//.. more
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.getName().equals("close") || method.getName().equals("unsubscribe")) {
//skip methods
return Void.TYPE;
}
// .. more
return method.invoke(instance, args);
}
}
The PooledObjectFactory
implementation should return this proxied instance.
xxxxxxxxxx
class ConsumerPoolFactory<K,V> extends BasePooledObjectFactory<PoolableConsumer<K, V>> {
private static AtomicInteger n = new AtomicInteger();
private final Map<String, Object> consumerProperties = new HashMap<>();
"unchecked") (
public PoolableConsumer<K, V> create() throws Exception {
Map<String, Object> props = new HashMap<>(consumerProperties);
String groupdId = groupPrefix;
if(props.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
groupdId = props.get(ConsumerConfig.GROUP_ID_CONFIG).toString();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdId+"__"+n.getAndIncrement());
return (PoolableConsumer<K, V>) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { PoolableConsumer.class },
new ConsumerProxy<K, V>(new HashMap<>(props)));
}
}
And the PoolableConsumer
interface extends the Kafka Consumer
interface by adding commons-pool callback methods. This may not be strictly necessary, however, and a decorator based proxying can also be used.
xxxxxxxxxx
interface PoolableConsumer<K,V> extends Consumer<K, V> {
void destroyProxy();
boolean validateProxy();
void activateProxy();
void passivateProxy();
}
To keep alive the consumer connections, we would pause()
the consumer on pool checkin, and start a scheduled background thread (at least prior to v0.10.1.0 refer this KIP) that invokes the poll()
method periodically.
On pool checkout, we would resume()
the consumer, and then unsubscribe()
the last assignment.
x
// in the invoke() method of the ConsumerProxy
if(method.getName().equals("activateProxy")) {
if (wasPassivated) {
future.cancel(true);
instance.resume(instance.paused());
instance.unsubscribe();
wasPassivated = false;
}
return Void.TYPE;
}
if(method.getName().equals("passivateProxy")) {
instance.pause(instance.assignment());
if(heartBeatThreadIsEnabled){
future = timerThread.scheduleWithFixedDelay(() -> instance.poll(Duration.ofMillis(100)), 1000, 1000, TimeUnit.MILLISECONDS);
}
wasPassivated = true;
return Void.TYPE;
}
After checking out a consumer from the pool, we would need to assign the new topic/partitions to the consumer. This would be part of the ObjectPool
implementation.
xxxxxxxxxx
public class KafkaConsumerPool<K, V> extends GenericObjectPool<PoolableConsumer<K, V>> {
/**
* Acquire consumer starting from a given offset
* @param maxwait
* @param unit
* @param topicPartitionOffset
* @return
* @throws Exception
*/
public Consumer<K, V> acquire(long maxwait, TimeUnit unit, Map<TopicPartition, Long> topicPartitionOffset) throws Exception {
PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
consumer.assign(topicPartitionOffset.keySet());
topicPartitionOffset.keySet().forEach(tp -> {
consumer.seek(tp, topicPartitionOffset.get(tp));
});
return consumer;
}
/**
* Acquire consumer starting from beginning offset
* @param maxwait
* @param unit
* @param topicPartitions
* @return
* @throws Exception
*/
public Consumer<K, V> acquireLatest(long maxwait, TimeUnit unit, TopicPartition... topicPartitions) throws Exception {
PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
List<TopicPartition> partitionList = Arrays.asList(topicPartitions);
consumer.assign(partitionList);
consumer.seekToEnd(partitionList);
return consumer;
}
}
The consumer should be closed only when the instance is evicted from the pool. For stale checking on checkout, we can invoke some metadata method on the consumer.
xxxxxxxxxx
// in the invoke() method of the proxy
if(method.getName().equals("destroyProxy")) {
instance.close();
return Void.TYPE;
}
if(method.getName().equals("validateProxy")) {
try {
instance.listTopics(Duration.ofMillis(1000));
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
That's all for now. The complete implementation can be found at the following GitHub link. Please feel free to share your candid comments. Thank you!
Opinions expressed by DZone contributors are their own.
Comments