Kafka With Spring Cloud Stream
Everything you need to get started with Kafka's Spring Cloud Stream.
Join the DZone community and get the full member experience.
Join For FreeThis post gives a step-by-step tutorial to enable messaging in a microservice using Kafka with Spring Cloud Stream.
Spring Cloud Stream is a framework under the umbrella project Spring Cloud, which enables developers to build event-driven microservices with messaging systems like Kafka and RabbitMQ.
Asynchronous messaging systems are always an important part of any modern enterprise software solution. The evolution of microservices has shortened the time-to-market for any software product, but this is not possible without the necessary tools and frameworks.
You may also like: Event-Driven Microservices Using Spring Cloud Stream and RabbitMQ.
Spring Cloud Stream is a framework built on top of Spring Integration. It integrates with Spring Boot seamlessly to build efficient microservices in less time to connect with shared messaging systems. Spring Cloud Stream provides multiple binder implementations such as Kafka, RabbitMQ and various others. The details are provided here.
Here is a step-by-step tutorial on building a simple microservice application based on Spring Boot and uses Spring Cloud Stream to connect with a Kafka instance.
Getting Started
Install Kafka and create a topic. I am using a Kafka broker running on my local windows machine for this demonstration, but it can be an installation on a Unix machine as well. Steps for Kafka installation on windows machine are provided here.
Create a Spring Boot starter project either using STS IDE or Spring Initializr. I am providing the pom.xml for reference.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.techwording</groupId>
<artifactId>spring-cloud-stream-kafka-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-kafka-example</name>
<description>Demo project for Spring Cloud Stream and Kafka</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The Spring Cloud Stream project needs to be configured with the Kafka broker URL, topic, and other binder configurations. Below is an example of configuration for the application.
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: test
content-type: text/plain
group: input-group-1
output:
binder: kafka
destination: test
group: output-group-1
content-type: text/plain
We will need at least one producer and a consumer to test the message and send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using Spring Cloud Stream.
package com.techwording.scs;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
public class Producer {
private Source mySource;
public Producer(Source mySource) {
super();
this.mySource = mySource;
}
public Source getMysource() {
return mySource;
}
public void setMysource(Source mysource) {
mySource = mySource;
}
}
package com.techwording.scs;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;
@EnableBinding(Sink.class)
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
logger.info("recieved a string message : " + message);
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
public void handle(@Payload ChatMessage message) {
final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
.withZone(ZoneId.systemDefault());
final String time = df.format(Instant.ofEpochMilli(message.getTime()));
logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
}
}
We will also create a Rest Controller class, which will accept the message over HTTP and pass it to the producer. This is just to make the testing convenient.
package com.techwording.scs;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Controller {
private Producer producer;
public Controller(Producer producer) {
super();
this.producer = producer;
}
// get the message as a complex type via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
public String publishMessageComplextType(@RequestBody ChatMessage payload) {
payload.setTime(System.currentTimeMillis());
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "chat")
.build());
return "success";
}
// get the String message via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
public String publishMessageString(@RequestBody String payload) {
// send message to channel
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "string")
.build());
return "success";
}
}
Run the below maven commands to build and run this project.
mvn clean install
mvn spring-boot:run
Hit the POST endpoint /sendMessage/string
and check the application console logs. Here is an example output the application produced when I hit this endpoint with message "hello" in the rest body.
2019-10-01 14:37:22.764 INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer : received a string message : {"contents":"hello","time":1569920841187}
Hit the POST endpoint /sendMessage/complexType
and check the application console logs.
2019-10-01 14:37:22.773 INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer : received a complex message : [2:37:21 PM]: hello
The annotation @EnableBinding
takes one or more interfaces as parameters. In this example, we have used Sink and Source interfaces, which declare input and output channels, respectively. You can also define your own interfaces for this purpose.
@StreamListener
annotation is a convenient way provided by Spring Cloud Stream for content-based routing. It works based on a pub-sub model, and every @StreamListener
receives its own copy of the message.
I have used two stream listeners in this project — one for consuming plain string messages and another one for messages with a complex type, ChatMessage. The producer sends messages attached with a header "type" with a logical value and consumer can apply conditions to filter messages using @StreamListener
.
You can find the complete project here.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments