Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time
In this blog, the reader will learn how to stop and resume a Kafka client or producer at runtime using two distinct methods.
Join the DZone community and get the full member experience.
Join For FreeImagine you are running a Kafka cluster, and suddenly you need to perform maintenance on one of your Kafka clients or producers. What do you do? In this blog, we will explore how to stop and resume a Kafka client or producer at runtime using the Java client API.
Kafka has become an indispensable building block for streaming data pipelines due to its high throughput, fault tolerance, and scalability, which make it an excellent option for processing large volumes of data in real time. Additionally, it offers the significant advantage of supporting several programming languages, including Java, Python, Kotlin, Rust, and others.
In this blog, we will discuss how to stop and resume a Kafka client or producer at runtime. We will explore two distinct methods: one involves utilizing REST service endpoints, while the other involves using Spring Actuator endpoints.
Tech Stack
- Spring Boot
- Spring Integration
- Kafka Cluster (running in Docker)
- Java 17 ( Or 8)
Demo Scene
Let’s begin by creating a Kafka producer. Here I am using Spring Integration to create a Kafka Producer. As I have mentioned in my previous blogs, Spring integration is the most powerful module that Spring Introducer, which works with Message Driven Approach backed by Enterprise Integration Patterns.
ProducerIntegrationConfig.java
@Configuration
public class KafkaProducerConfig {
private KafkaProperties kafkaProperties;
private String kafkaTopic;
public KafkaProducerConfig(KafkaProperties kafkaProperties, @Value("${app.topic-name}") String kafkaTopic){
this.kafkaProperties = kafkaProperties;
this.kafkaTopic = kafkaTopic;
}
public IntegrationFlow producerIntegrationFlow(){
return IntegrationFlow.from(() -> new GenericMessage<>(""),
c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(5)))
.id("kafkaProducerBean"))
.transform(message -> new Date().toString())
.log()
.channel("to-kafka-producer-template")
.get();
}
public IntegrationFlow kafkaProducerTemplate(KafkaTemplate<?,?> kafkaTemplate){
kafkaTemplate.setDefaultTopic(this.kafkaTopic);
return IntegrationFlow.from("to-kafka-producer-template")
.handle(Kafka.outboundChannelAdapter(kafkaTemplate))
.get();
}
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
app:
topic-name: demo-topic
Now Let's Create a Simple Streams Processor (By Configuring the Binders). I Am Using Spring Cloud Streams To Create a Streaming Processor.
StreamConsumer.java
@Configuration
@Slf4j
public class StreamConsumer {
@Bean
public Consumer<KStream<?,String>> myConsumer(){
return input ->
input.foreach((key, value) -> {
log.debug("Key: {} Value: {}", key, value);
});
}
}
application.yml
spring:
application:
name: processor-demo
cloud:
stream:
bindings:
myConsumer-in-0:
destination: demo-topic
binder: kstream-consumer
group: processor-group
kafka:
streams:
binder:
brokers: localhost:9092
binders:
kstream:
type: kstream
environment:
spring.cloud.stream.kafka.streams.binder.brokers: localhost:9092
With Rest Service Endpoints…
Spring Integration allows us to control and monitor the messaging endpoints that we created. This can be done in two steps.
Step 1: Create a Control Bus Message Channel, Define a Flow, and Finally, a Gateway Function
ProducerIntegrationConfig.java. ~Modify Producer’s Integration Config.
//Add This channel to Integration Config
@Bean
public MessageChannel controlChannel() {
return MessageChannels.direct().get();
}
Step 2: Call the Above Function in a Rest Controller
ProducerDemoController.java
@RestController
public class ProducerDemoController {
private MessageChannel controlChannel;
public ProducerDemoController(@Qualifier("controlChannel") MessageChannel controlChannel){
this.controlChannel = controlChannel;
}
@GetMapping("/stopProducer")
public void stopProducer(){
controlChannel.send(new GenericMessage<>("@kafkaProducerBean.stop()"));
}
@GetMapping("/startProducer")
public void startProducer(){
controlChannel.send(new GenericMessage<>("@kafkaProducerBean.start()"));
}
}
Start and Resume the Producer Through Rest Endpoint
http://localhost:8080/startProducer http://localhost:8080/stopProducer
With Spring Boot’s Actuator Endpoints…
Add the below block to expose the bindings through actuator endpoints.
application.yml
management:
endpoints:
web:
exposure:
include:
-bindings
Stop and Resume the Consumer Function Through Actuator Endpoint
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0 curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0
Summary
In this blog, we discussed how to stop and resume a Kafka client or producer at runtime using the REST API and Actuator. The ability to stop and resume Kafka clients or producers is essential for maintaining the health of a Kafka cluster and ensuring the smooth operation of real-time data pipelines.
The source code can be found on my GitHub. Also, you can reach out to me on LinkedIn for any questions or suggestions.
That’s all for now. Happy Learning!
Opinions expressed by DZone contributors are their own.
Comments