Messaging With Spring Boot and Azure Service Bus
Moving to the cloud? Working on a project using event-based messaging? Using Kotlin to do it? Time to dig into some beans on the Azure Service Bus.
Join the DZone community and get the full member experience.
Join For FreeLike many other companies, my employer is currently on a journey towards a cloud-based infrastructure. We have decided to go with Azure as our cloud provider, and my team and I are the lucky ones to be in the front row to deploy our project to the cloud. Our project consists primarily of Spring Boot based microservices written in Kotlin that are hosted on a Kubernetes cluster.
We have recently had to implement features that would benefit from event-based messaging using topics, which brought the opportunity to dig into Azure Service Bus. Luckily, Microsoft has created a Spring Boot Starter package that can communicate with Azure Service Bus using JMS and the AMQP messaging protocol. How difficult can that be? Going from zero to something that connects and sends a couple of messages is quite easy. Still, I have discovered some pitfalls that should be avoided before the application hits the production environment that isn’t covered by the Azure Service Bus JMS Spring Boot Starter client library documentation.
Transactions
When an application is consuming messages from a topic, it is most likely because it is important to that application. The application might need to react somehow – like saving something to a database, call another application, or something else. If the application fails to process the message, maybe because the database or the other application is unable to respond, the use of transactions often comes in handy.
Unfortunately, when using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package (the package providing autoconfiguration of Spring JMS for Azure Service Bus), you are relying on the default configuration of the JmsAccessor interface provided by Spring Framework, where the default transaction mode is false.
This, combined with the default acknowledge mode AUTO_ACKNOWLEDGE, leaves you without support for message re-delivery when delivery of a message or message group fails, despite having a “max delivery count” greater than 1 configured on the service bus subscription.
With a transacted session, the application completely controls the message delivery by either committing to (when the receiver function returns successfully) or rolling back (if the receiver function throws an unhandled exception) the session.
Enabling local transactions is not enough, though, if you need message re-delivery in case of an exception. The topicJmsListenerContainerFactory bean provided by the azure-spring-boot package creates listener container instances of the type DefaultMessageListenerContainer, which applies message acknowledgment before listener execution when the default AUTO_ACKNOWLEDGE mode is used. This means that if the message listener throws an exception during message receipt or message processing, the message is not handed back to the broker because the message has already been acknowledged.
Therefore, session acknowledge mode CLIENT_ACKNOWLEDGE should be used on the container factory, which provides re-delivery in case of an exception.
Instead of using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package, we can define our own:
xxxxxxxxxx
@Bean
@Primary
fun transactedListenerContainerFactory(
connectionFactory: ConnectionFactory
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
val listenerContainerFactory = DefaultJmsListenerContainerFactory()
listenerContainerFactory.setConnectionFactory(connectionFactory)
listenerContainerFactory.setSubscriptionDurable(true)
listenerContainerFactory.setSessionTransacted(true)
listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
return listenerContainerFactory
}
We will then need to change the value of the containerFactory attribute of the @JmsListener, to the name of the JmsListenerContainerFactory bean we have just created.
xxxxxxxxxx
@JmsListener(
destination = "message-topic",
subscription = "consumer-service",
containerFactory = "transactedListenerContainerFactory"
)
fun receiveMessage(msg: TextMessage) {
log.info("Message received: ${msg.text}")
}
Slow Consumers and the Default Prefetch Policy
After a lot of testing and bugfixing, we thought that we had fixed why many of the messages on the topic ended up in the DeadLetter Queue. Unfortunately, many messages were still ending up in the DeadLetter Queue, but we didn’t see any signs of errors or re-deliveries in the logs.
Suppose we take a closer look at the javax.jms.ConnectionFactory bean provided by the azure-spring-boot package, we’ll find out that the bean definition relies heavily on the default JmsConnectionFactory implementation from the Apache QPID library, which is completely fine in most cases. Still, because we had a slow consumer, the default prefetch policy was causing some trouble.
The default prefetch policy defines a prefetch size of 1000. This means that our application will always ask for chunks of 1000 messages to reduce the overhead of message delivery by avoiding unnecessary round trips to the Service Bus. Chunks of 1000 messages will then get processed by the application, but the messages are not acknowledged before they have been processed.
The late acknowledgment combined with the default message lock duration of 30 seconds (a property of the service bus subscription on Azure) can cause the messages to end up in the DLQ if the application fails to process and acknowledge all messages before the lock duration has expired.
Instead of using the default prefetch policy defined in the Apache QPID library, we can define our own.
xxxxxxxxxx
@Bean
@Primary
fun transactedListenerContainerFactory(
connectionFactory: ConnectionFactory
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
.prefetchPolicy = prefetchPolicy()
val listenerContainerFactory = DefaultJmsListenerContainerFactory()
listenerContainerFactory.setConnectionFactory(connectionFactory)
listenerContainerFactory.setSubscriptionDurable(true)
listenerContainerFactory.setSessionTransacted(true)
listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
return listenerContainerFactory
}
private fun prefetchPolicy(): JmsPrefetchPolicy {
val prefetchPolicy = JmsDefaultPrefetchPolicy()
prefetchPolicy.setAll(100)
return prefetchPolicy
}
Notice how the prefetchPolicy property is overwritten on the JmsConnectionFactory. When doing it this way, we’re still using the ConnectionFactory bean provided by the azure-spring-boot package, thus avoiding having to manually provide the connection settings that otherwise are provided by the azure-spring-boot package.
Conclusion
In this article, I described some of the pitfalls one can encounter when using the default JMS settings provided by the azure-spring-boot Java package, along with a possible solution to each of those.
The complete code examples used in this article are available on GitHub.
Opinions expressed by DZone contributors are their own.
Comments