Synchronized Mule Batch Processing
This tutorial will teach you the steps to set up synchronized batch processing in a Mule application to send batch reporting figures back to the flow.
Join the DZone community and get the full member experience.
Join For FreeAs we know, Mule Batch always runs in an async manner. But sometimes, it is required to send batch reporting figures (i.e. number of successes/failed records etc.) to the flow back for further processing.
Objective
Our objective is to create a simple REST API which invokes a batch process and sends the batch statics in the api response.
Steps
1. Create a new mavenized Mule project in Anypoint Studio. Now add the following dependency to your pom.xml:
<dependency>
<groupId>com.mulesoft.munit.utils</groupId>
<artifactId>munit-synchronize-module</artifactId>
<version>1.0.0</version>
</dependency>
2. Create a flow, Flow1, by dragging and dropping an HTTP connector into your configuration.xml. Configure the endpoint with basic details.
3. Drag and drop a batch scope into your configuration XML and put 2-3 steps into it.
4. To get the data back from the batch on-complete step, we'll have to bring a VM endpoint into the picture (session variables, flow variables, etc will not work in this case). We will push batch metrics like number of success records , number of failed records, etc to a VM and will access the same VM from the main flow.
5. Now create another flow, Flow2. From this flow, we'll invoke the batch process by putting a "batch execute" in it.
6. From the Flow1, call Flow 2 with flow-ref. Now, to make this call as a synchronized one, cover your flow-ref in run-and-wait scope. Now your batch becomes synchronized; your main flow (Flow1) will wait for the batch to complete.
7. Later in Flow1, add a Mule requestor. This Mule requestor will call the VM endpoint and fetch the batch statics from the VM queue.
8. Now you can send your batch status in the API response. Please refer to the final XML file below:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:mulerequester="http://www.mulesoft.org/schema/mule/mulerequester" xmlns:synchronize="http://www.mulesoft.org/schema/mule/synchronize"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/synchronize http://www.mulesoft.org/schema/mule/synchronize/current/mule-synchronize.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/mulerequester http://www.mulesoft.org/schema/mule/mulerequester/current/mule-mulerequester.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8088" doc:name="HTTP Listener Configuration"/>
<vm:connector name="VM1" validateConnections="true" doc:name="VM"/>
<vm:endpoint exchange-pattern="one-way" path="test-queue-data" connector-ref="VM1" name="VM" doc:name="VM"/>
<flow name="flow1">
<http:listener config-ref="HTTP_Listener_Configuration" path="/execute" doc:name="HTTP"/>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
[
{name:'dpk'},
{name:"mht"}
]]]></dw:set-payload>
</dw:transform-message>
<synchronize:run-and-wait timeout="16000" doc:name="Synchronize">
<flow-ref name="flow2" doc:name="flow2"/>
</synchronize:run-and-wait>
<mulerequester:request resource="VM" doc:name="Mule Requester"/>
<json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>
<flow name="flow2">
<batch:execute name="testbatchBatch" doc:name="testbatchBatch"/>
</flow>
<batch:job name="testbatchBatch" max-failed-records="3">
<batch:process-records>
<batch:step name="Batch_Step">
<logger message="1" level="INFO" doc:name="Logger"/>
</batch:step>
<batch:step name="Batch_Step1">
<logger message="2" level="INFO" doc:name="Logger"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[Thread.sleep(6000);]]></scripting:script>
</scripting:transformer>
</batch:step>
<batch:step name="Batch_Step2">
<logger message="3" level="INFO" doc:name="Logger"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[throw new java.lang.Exception("Test");]]></scripting:script>
</scripting:transformer>
</batch:step>
</batch:process-records>
<batch:on-complete>
<set-payload value="#[payload.successfulRecords]
#[payload.failedRecords]" doc:name="Set Payload"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="test-queue-data" connector-ref="VM1" doc:name="VM"/>
</batch:on-complete>
</batch:job>
</mule>
Opinions expressed by DZone contributors are their own.
Comments