JMS Transaction Propagation Over Multiple Threads Using Spring
Learn how to propagate a JMS transaction over multiple threads, adding benefits like optimizing the usage of IO threads which initially poll a message for a JMS queue.
Join the DZone community and get the full member experience.
Join For FreeJava Messaging Service (JMS) is an API that is capable of creating, sending, and reading messages. JMS provides loosely coupled, reliable, and asynchronous communication functionality. Hence, JMS is used in enterprise applications to integrate multiple systems together.
When it comes to integration, it is paramount to guarantee the reliability of the overall solution as to not lose any message passing through the solution. This is where the JMS transactionality comes in handy, where you can read a message transactionally from a JMS queue and after performing all the required logic on the message, you can either commit or rollback the message. In the case of committing the message, it will be removed from the source queue, and in the case of a rollback, the message will remain in the JMS queue for a successive re-try attempt.
In order to connect to a JMS queue, we need a JMS connection factory and the Spring binds the transaction of the message to the connection factory. Hence, we cannot span a single transaction among multiple JMS queues which are not connected via the same connection factory. Without any further ado, let’s dig into the code.
First, a Java class with the required thread local as shown below is required. If you are not familiar with thread locals, the best starting point would be the Java documentation of thread locals.
public class TransactionInformation {
public static ThreadLocal<TransactionStatus> txnStatus = new ThreadLocal();
}
The txnStatus thread local is used to store the current status of the transaction. If we evaluate the thread local value during runtime, we will be able to see the content of the TransactionStatus object’s content as below.
In order for a transaction to be initiated, we need to wrap the connection factory with the Spring JMSTransactionManager. Further, we need to initialize the txnStatus thread local in the doReceiveAndExecute() method within the org.springframework.jms.listener.SpringMessageListenerContainer class. Sample implementation would be as follows.
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException {
if (status != null) {
TransactionInformation.txnStatus.set(status);
}
return super.doReceiveAndExecute(invoker, session, consumer, status);
}
After that, we need to copy the thread locals from one thread to another. This is the tricky part. Of course, we can use reflection and directly copy the thread locals but in this case, we can use the JMS message as a context object and attach the thread locals to the JMS message from the initial IO thread. All the important thread locals which need to be copied resides within org.springframework.transaction.support.TransactionSynchronizationManager class and the resources thread local in that class plays an important role here. The evaluated value of that during the runtime would be as below.
As you can see, this is a map and the key is our connection factory and the value is a resource holder which includes the JMS session information. Using below code segment, we can obtain the resource map and clear the thread local from the current thread.
Map<Object, Object> savedResources = new HashMap<>();
Map<Object, Object> resources = TransactionSynchronizationManager.getResourceMap();
if (resources != null) {
for (Map.Entry e : resources.entrySet()) {
savedResources.put(e.getKey(), e.getValue());
TransactionSynchronizationManager.unbindResource(e.getKey());
}
}
The savedResources variable can be attached as a property to the JMS message and later when the message is being processed by a new thread, we can obtain this resource and set it as the thread local value for the new thread by invoking the bindResource() method of TransactionSynchronizationManager class for each key, value pair in the savedResources map. After that, the new thread will be a part of the initial JMS transaction.
One salient point to remember is that it is vital to clear the thread locals at the end of the thread execution time if we are re-using the threads. As an example, when using a thread pool, it is mandatory to clean the thread locals before returning them to the pool. Otherwise, it would result in a thread local leak and unexpected behavior in JMS transactionality.
One of the biggest advantages of propagating a JMS transaction over multiple threads is that it will optimize the usage of IO threads which initially poll the message for a JMS queue. Usually, IO thread has to execute all the logic related to JMS message until it is committed or rolled back. If the intermediate logic involves time-consuming computations, it will negatively affect the message throughput of the system. By propagating the transaction, we delegate the computation logic to a new thread while preserving the transactionality of the message flow.
It seems like a lot to digest at first, but the good thing is that AdroitLogic’s Project-X framework provides all of this functionality out of the box and you can try it out by yourself using UltraStudio without writing a single line of code. There is a sample project which explains the JMS transactionality and you can create and run the project directly within UltraStudio.
Published at DZone with permission of Sajith Dilshan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments