How to Create a Spring Cloud Stream Binder From Scratch
Learn how to develop a custom Spring Cloud Stream binder from scratch.
Join the DZone community and get the full member experience.
Join For FreeSpring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that is designed to build event-driven microservices communicating via one or more shared messaging systems.
The core Spring Cloud Stream component is called “Binder,” a crucial abstraction that’s already been implemented for the most common messaging systems (e.g. Apache Kafka, Kafka Streams, Google PubSub, RabbitMQ, Azure EventHub, and Azure ServiceBus).
You may also like: Spring Cloud Stream With Kafka
In this article, we’ll see in detail how to develop a custom Spring Cloud Stream binder from scratch.
Introduction
The official Spring Cloud Stream documentation already provides a very basic explanation of how to implement your own Spring Cloud Stream binder.
Here’s a brief excerpt from it about the Binder Service Provider Interface that must be implemented in order to create a custom binder:
The Binder SPI consists of a number of interfaces, out-of-the box utility classes, and discovery strategies that provide a pluggable mechanism for connecting to external middleware. The key point of the SPI is theBinder
interface, which is a strategy for connecting inputs and outputs to external middleware. The following listing shows the definition of theBinder
interface:
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
And here’s one more documentation snippet that’s basically a micro-tutorial about developing Spring Cloud Stream binders:
A typical binder implementation consists of the following: a class that implements theBinder
interface; a Spring@Configuration
class that creates a bean of typeBinder
along with the middleware connection infrastructure; aMETA-INF/spring.binders
file found on the classpath containing one or more binder definitions, as shown in the following example:
xxxxxxxxxx
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
Even though the above documentation is quite helpful to get started, it would definitely help to have a more thoroughly detailed guide and a practical example go along with it.
TL;DR: Just Gimme the Code
If you don't have the time to go through this detailed tutorial, you can jump to my demo on GitHub, which includes a custom file-based Spring Cloud Stream binder like the one shown below and a sample application that depends on it.
Developing the Custom Binder
Let’s get our hands dirty by developing a custom Spring Cloud Stream binder that consumes events by reading files and produces events by writing to files!
Create a new Maven project with a pom.xml file similar to the following, which includes the dependency to Spring Cloud Stream:
xxxxxxxxxx
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-cloud-stream-custom-binder</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-custom-binder</name>
<description>A demo custom Spring Cloud Stream Binder</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring-cloud.version>Hoxton.RC1</spring-cloud.version>
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Technically, we just need to provide our org.springframework.cloud.stream.binder.Binder
implementation, but practically, the binder depends on two more components that we need to provide first: the ProvisioningProvider
and the MessageProducer
.
The ProvisioningProvider
is responsible for the provisioning of consumer and producer destinations, and it is particularly useful to convert the logical destinations included in the application.yml or application.properties file in physical destination references (you could look Spring beans up by destination name, or simply trim the destination name as we do in the following snippet):
xxxxxxxxxx
package com.example.springcloudstreamcustombinder.provisioners;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
public ProducerDestination provisionProducerDestination(
final String name,
final ProducerProperties properties) {
return new FileMessageDestination(name);
}
public ConsumerDestination provisionConsumerDestination(
final String name,
final String group,
final ConsumerProperties properties) {
return new FileMessageDestination(name);
}
private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
private final String destination;
private FileMessageDestination(final String destination) {
this.destination = destination;
}
public String getName() {
return destination.trim();
}
public String getNameForPartition(int partition) {
throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
}
}
}
The MessageProducer
— unlike the name suggests — is responsible for consuming the events and handling them as messages to the client application that is configured to consume such events.
Here is an example of a MessageProducer
implementation that polls on a file that matches the trimmed destination name and is located in the project path, while also archiving read messages and discarding consequent identical messages:
xxxxxxxxxx
package com.example.springcloudstreamcustombinder.producers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import static java.nio.file.StandardOpenOption.*;
import static java.util.concurrent.TimeUnit.*;
public class FileMessageProducer extends MessageProducerSupport {
public static final String ARCHIVE = "archive.txt";
private final ConsumerDestination destination;
private String previousPayload;
public FileMessageProducer(ConsumerDestination destination) {
this.destination = destination;
}
public void doStart() {
receive();
}
private void receive() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
String payload = getPayload();
if(payload != null) {
Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
archiveMessage(payload);
sendMessage(receivedMessage);
}
}, 0, 50, MILLISECONDS);
}
private String getPayload() {
try {
List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
String currentPayload = allLines.get(allLines.size() - 1);
if(!currentPayload.equals(previousPayload)) {
previousPayload = currentPayload;
return currentPayload;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
private void archiveMessage(String payload) {
try {
Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Finally, with all the required components in place, we can implement our own Binder by extending the AbstractMessageChannelBinder
class, providing the required constructors and overriding the inherited abstract methods:
xxxxxxxxxx
package com.example.springcloudstreamcustombinder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import com.example.springcloudstreamcustombinder.producers.FileMessageProducer;
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import static java.nio.file.StandardOpenOption.*;
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
public FileMessageBinder(
String[] headersToEmbed,
FileMessageBinderProvisioner provisioningProvider) {
super(headersToEmbed, provisioningProvider);
}
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
final ProducerProperties producerProperties,
final MessageChannel errorChannel) throws Exception {
return message -> {
String fileName = destination.getName();
String payload = new String((byte[])message.getPayload()) + "\n";
try {
Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
protected MessageProducer createConsumerEndpoint(
final ConsumerDestination destination,
final String group,
final ConsumerProperties properties) throws Exception {
return new FileMessageProducer(destination);
}
}
Last but not least, we need to provide the Spring Configuration for our binder as follows:
xxxxxxxxxx
package com.example.springcloudstreamcustombinder.config;
import com.example.springcloudstreamcustombinder.FileMessageBinder;
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class FileMessageBinderConfiguration {
public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
return new FileMessageBinderProvisioner();
}
public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
return new FileMessageBinder(null, fileMessageBinderProvisioner);
}
}
And the related src/main/resources/META-INF/spring.binders file with the binder name followed by the qualified name of the binder’s Spring Configuration:
xxxxxxxxxx
myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration
Congratulations! Your custom binder implementation is now complete and can be installed in your local Maven repository by running mvn clean install
Testing the Custom Binder
Head over to start.spring.io and generate a Spring Boot 2.2.0 project with Cloud Stream as the only required dependency (or just click on this link instead, and generate the project from there).
Add your custom binder dependency to the pom.xml file, in the dependencies section:
xxxxxxxxxx
<dependency>
<groupId>com.example</groupId>
<artifactId>spring-cloud-stream-custom-binder</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Replace the src/main/resources/application.properties file with the following application.yml file, which enables logging for all events that are managed by Spring Cloud Stream:
xxxxxxxxxx
logging:
level:
org.springframework.cloud.stream.messaging.DirectWithAttributesChannel: DEBUG
Replace the src/main/java/SpringCloudStreamCustomBinderDemo.java file contents with the following:
xxxxxxxxxx
Sink.class, Source.class}) ({
public class SpringCloudStreamCustomBinderDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamCustomBinderDemoApplication.class, args);
}
Sink.INPUT) (
Source.OUTPUT) (
public String handle(String message) {
return String.format("Received: %s", message);
}
}
Finally, add a file named “input” to the main project directory and write something to it.
With this configuration in place, you can now launch the application by running mvn spring-boot:run
, relying on the custom Spring Cloud Stream binder we just implemented to: keep consuming events from the “input” file you just created; write the processing results to an output file named “output”; and keep track of all previously read messages in the “archive.txt” file.
Conclusion
The official Spring Cloud Stream reference documentation is quite helpful when it comes down to implementing your own binder but it’s definitely not enough.
That being held, creating your own binder implementation is almost effortless even though it might appear like a hard task at first; also, knowing how to build a custom Spring Cloud Stream binder makes for a quite niche and useful skill in the development field!
Moreover, knowing how Spring Cloud Stream binders work makes it easy to customize already existing ones, and not just building new binders (which, in all honesty, might be a less common scenario given most use cases).
References & Useful Resources
If you liked what you just read, please consider following me on Medium and Twitter for more articles like this!
Further Reading
Spring Cloud Stream With Kafka
Building and Testing Message-Driven Microservices Using Spring Cloud Stream
Published at DZone with permission of Domenico Sibilio. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments