Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.
Join the DZone community and get the full member experience.
Join For FreeOverview
In this article, we'll introduce the main concepts and constructs of Spring Cloud Stream with some simple test-examples based on EmbeddedKafkaRule using MessageCollector
Getting Started
Dependencies & Configuration
To get started, we'll need to add the Spring Cloud Starter Stream with the Kafka broker Gradle dependency to our build.gradle:
xxxxxxxxxx
dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
The Spring Cloud Stream project needs to be configured with the Kafka broker URL, topic, and other binder configurations. Below is an example of a configuration for the application.yaml:
xxxxxxxxxx
spring:
application:
name: cloud-stream-binding-kafka-app
cloud:
stream:
kafka:
binder:
brokers: 0.0.0.0:8080
configuration:
auto-offset-reset: latest
bindings:
customChannel: #Channel name
destination: 0.0.0.0:8080 #Destination to which the message is sent (topic)
group: input-group-N
contentType: application/json
consumer:
max-attempts: 1
autoCommitOffset: true
autoCommitOnError: false
Constructs
This is a simple Spring Cloud Stream-based service that listens to input binding (SpringCloudStreamBindingKafkaApp.kt):
xxxxxxxxxx
ProducerBinding::class) (
class SpringCloudStreamBindingKafkaApp
fun main(args: Array<String>) {
SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
}
The annotation @EnableBinding configures the service to bind input and output channels.
Now let's see the main concepts:
Bindings: a collection of interfaces that identify the input and output channels declaratively
Binder: messaging middleware implementation such as Kafka or another
Channel: represents the communication pipe between messaging middleware and the application
StreamListeners: message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization or deserialization between middleware specific events and domain object types or "POJO"
Message Schemas: used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically
We will need at least one producer and a consumer to test the message and send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using Spring Cloud Stream.
Producer
There is a producer bean that will send messages to a Kafka topic (ProducerBinding.kt):
xxxxxxxxxx
interface ProducerBinding {
BINDING_TARGET_NAME) (
fun messageChannel(): MessageChannel
}
Consumer
A consumer bean will listen to a Kafka topic and receive messages (ConsumerBinding.kt):
xxxxxxxxxx
interface ConsumerBinding {
companion object {
const val BINDING_TARGET_NAME = "customChannel"
}
BINDING_TARGET_NAME) (
fun messageChannel(): MessageChannel
}
(Consumer.kt):
xxxxxxxxxx
ConsumerBinding::class) (
class Consumer(val messageService: MessageService) {
target = ConsumerBinding.BINDING_TARGET_NAME) (
fun process(
message: Map<String, Any?>,
value = KafkaHeaders.OFFSET, required = false) offset: Int? (
) {
messageService.consume(message)
}
}
A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with spring-kafka-test
Functional Testing Using MessageCollector
This is a binder implementation that allows interaction with channels and reception of the messages. We send a message to the producer binding message channel and then receive it as payload (ProducerTest.kt):
xxxxxxxxxx
class ProducerTest {
lateinit var producerBinding: ProducerBinding
lateinit var messageCollector: MessageCollector
fun `should produce somePayload to channel`() {
// ARRANGE
val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
// ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
val payload = messageCollector.forChannel(producerBinding.messageChannel())
.poll()
.payload
// ASSERT
val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
assertTrue(request.entries.stream().allMatch { re ->
re.value == payloadAsMap[re.key.toString()]
})
messageCollector.forChannel(producerBinding.messageChannel()).clear()
}
}
Embedded Kafka broker testing
We use @ClassRule annotation to create this Kafka broker.
The rule starts the Kafka and Zookeeper servers on a random port before starting the tests and shuts them down after complete. Embedded Kafka broker eliminates the need to have a real instance of Kafka and zookeeper while the test is running (ConsumerTest.kt):
xxxxxxxxxx
"test") (
exclude = [TestSupportBinderAutoConfiguration::class]) (
ProducerBinding::class) (
class ConsumerTest {
lateinit var producerBinding: ProducerBinding
lateinit var objectMapper: ObjectMapper
lateinit var messageService: MessageService
companion object {
var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
}
fun `should consume via txConsumer process`() {
// ACT
val request = mapOf(1 to "foo", 2 to "bar")
producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
.setHeader("someHeaderName", "someHeaderValue")
.build())
// ASSERT
val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
runBlocking {
delay(20)
verify(messageService).consume(requestAsMap)
}
}
}
Conclusion
In this tutorial, we demonstrated concepts of Spring Cloud Stream and showed how to use it with Kafka, and demonstrated how to use the complete JUnit testing based on EmbeddedKafkaRule with using MessageCollector.
You can find the complete source code here.
Opinions expressed by DZone contributors are their own.
Comments