Using FIFO Queues to Order Messages in Mule 4
See how to use FIFO queues to order messages in Mule 4.
Join the DZone community and get the full member experience.
Join For FreeObjective
Very often we come across requirements to maintain the order of messages that flow through the integration layer. For example, if Source System A updates a record, it needs to be updated accordingly in Target System B, but when there are multiple consecutive updates in System A, those need to be updated in the Target System B in the correct order. In a distributed multi-threaded environment, there is always a chance of a race condition where an older record may overwrite a newer record.
Scenario
If the status of an entity in Source System A, say #1234 is updated as “In-Progress” and subsequently updated again to “Complete”, the Source System A invokes an API implemented in Mule every time there is a status update to sync the record with Target System B. Most of the time, we would prefer implementing an asynchronous flow with queues in between to sync the records between the two systems. If the API invokes a publisher flow which publishes the messages into a Queue (say in Anypoint MQ) and there is a subscriber flow picks up the messages and updates the Target System B, then we can not guarantee the order of messages. Considering this scenario, there is always a chance that the status of “Complete” gets overwritten by “In-Progress” in the Target System B. The below diagram explains the scenario.
In Mule 4, to control concurrency on a flow or any component that supports it, an attribute called maxConcurrency is set, which determines the number of simultaneous invocations that component can receive an any given time, by default the subscriber flow will not have the attribute set so the concurrency will be determined by the Mule Runtime which can be more than one at a time.
In this kind of situation, FIFO queues in Anypoint MQ platform is very useful, with FIFO, Anypoint MQ ensures that the order in which messages are placed in a queue is the same order in which messages can be retrieved. FIFO Queues has some restriction on the number of in-flight messages that are allowed (max 10).
Ref: https://docs.mulesoft.com/mq/mq-queues#fifoqueues
Now, to ensure that the order of the messages in the Target System B the subscriber flow, which consumes messages from the FIFO queue should have maxConcurrency attribute set to 1, otherwise we will end up having parallel updates to the Target System, which we don’t want to have.
Example
The below example demonstrates the behavior.
Prerequisite:
1. Create a FIFO Queue in the Anypoint MQ.
Go to Anypoint Platform > MQ > Click the Create (+) button on the Top corner > Select FIFO Queue.
2. Create an Anypoint MQ Subscriber Configuration in your Mule Application
Here we have created a configuration with acknowledgment mode as MANUAL, which requires us to explicitly perform ACK to commit the messages in the MQ.
Note: Configure the properties as per your Mule runtime environment.
<anypoint-mq:default-subscriber-config
name="Anypoint_MQ_FIFO_Subscriber_Config" doc:name="Anypoint MQ Default subscriber"
doc:id="a509d1b7-5b60-4753-90dd-441a8cecc1df" acknowledgementTimeout="${fifo.test.queue.ackTimeout}"
pollingTime="${fifo.test.queue.poolingTime}"
maxRedelivery="${fifo.test.queue.redelivery}"
acknowledgementMode="MANUAL">
<anypoint-mq:connection url="${anypoint.mq.url}"
clientId="${anypoint.mq.client.app.id}" clientSecret="${anypoint.mq.client.secret}">
</anypoint-mq:connection>
</anypoint-mq:default-subscriber-config>
Publisher Flow:
The publisher flow is triggered by a Scheduler that runs every 1 second and publishes a JSON with Event Time into the FIFO queue.
<flow name="publisherFlow" doc:id="9b0900fc-ef0c-46ba-9afb-d91154d3875b" initialState="started" maxConcurrency="1">
<scheduler doc:name="Every 1 Second" doc:id="c9f81db1-1075-4664-8c54-1b6e79c634a2" >
<scheduling-strategy >
<fixed-frequency frequency="1" timeUnit="SECONDS"/>
</scheduling-strategy>
</scheduler>
<set-variable value="#[now() as String]" doc:name="eventTime to Now" doc:id="967b32c2-ade7-4ae6-b658-a838776af4dc" variableName="eventTime"/>
<set-payload value='#[%dw 2.0
output application/json
---
{
"Event_Time" : vars.eventTime
}]' doc:name="JSON with EventTime" doc:id="4a70b175-bca7-4186-81de-9e7bedf48156" />
<anypoint-mq:publish doc:name="To FIFO Queue" doc:id="5f5eb2c3-707b-42a6-a67c-b5a59e7f484d" config-ref="Anypoint_MQ_Task_Notif_Config" destination="${secure::blip.task.notif.queue}" properties="#[%dw 2.0
output application/json
---
{
"transactionId" : vars.transactionId default uuid(),
"eventTime" : payload.eventTime default (now() >> "Australia/Sydney") as String {format: "yyyy-MM-dd'T'HH:mm:ss"}
}]"/>
<logger level="INFO" doc:name="Finished Publishing" doc:id="9c60cfd6-0ac7-4ef5-9ad3-9af236263eb2" message='#["Published Event : " ++ vars.eventTime]'/>
</flow>
The output of the publisher flow shows messages are published at specific times:
Subscriber Flow:
The subscriber flow consumes the messages in the order in which they were published. The flow has the maxConcurrency attribute set to 1 so it doesn't allow parallel execution of multiple threads ensuring only 1 thread is executed at a time consuming the messages. There is a delay of 2 seconds put in the flow just to block the thread for a short duration simulating an update to a target system.
<flow name="subscriberFlow" doc:id="f6fe08d5-8f67-422d-8ef8-09b6a9073032" initialState="started" maxConcurrency="1">
<anypoint-mq:subscriber doc:name="For FIFO Queue" doc:id="c326ecbb-f7cc-407c-9092-7e6c8d4c9ced" config-ref="Anypoint_MQ_Task_Notif_Config" destination="${secure::blip.task.notif.queue}">
<reconnect-forever frequency="10000" />
</anypoint-mq:subscriber>
<set-variable value="#[attributes]" doc:name="Attribute" doc:id="3486e8cf-b48b-49a8-931c-0ad6ce2b3f26" variableName="attr"/>
<logger level="INFO" doc:name="Payload" doc:id="1dfe0afd-894c-4c2d-91b3-c781a6bdfaca" message="#[payload]"/>
<scripting:execute engine="groovy" doc:name="Execute Delay" doc:id="a5cf45c2-a66b-4ac3-adee-af1d7a94bb3f" >
<scripting:code >sleep(2000);
return payload;</scripting:code>
</scripting:execute>
<anypoint-mq:ack doc:name="For FIFO Queue" doc:id="7d3ce19c-17bf-4fda-9b6c-feaf642abd02" config-ref="Anypoint_MQ_Task_Notif_Config" messageContext="#[vars.attr]"/>
</flow>
Output of the subscriber flow shows messages are consumed in the same order in which they were published:
Conclusion
By using FIFO queues in Anypoint MQ and controlling the concurrency in Mule flow, we can ensure the order of the messages in a Mule application. Though the solution works as we control the concurrency by making the subscriber exclusive allowing only one instance of thread at a time, there are some disadvantages with exclusive subscriber such as the overall throughput of the transactions becomes less, another thing is that, with an exclusive subscriber, a message might block another message even though they really don’t have anything to do with each other.
Opinions expressed by DZone contributors are their own.
Comments