Message Chunk Splitter and Aggregator With MuleSoft
The message chunk splitter splits a message into chunks of bytes that defined by the messageSize attribute. It splits the message into various parts defined using by MEL.
Join the DZone community and get the full member experience.
Join For FreeThe message chunk splitter allows you to split a single message into a number of fixed-length messages that will be sent to the same message processor. It splits up the messages into small chunks according to the messageSize
attribute that you configured for your router. The message is split by first converting it into byte arrays and splitting these arrays into smaller chunks.
The message chunk aggregator combine two or more messages into a single message by matching the message with a given correlation ID.
Message Splitter vs. Message Chunk Splitter
The message chunk splitter splits the message into chunks of bytes that you defined using the messageSize
attribute. The splitter splits the message into various parts that you defined using Mule Expression Language (MEL).
For more details on Splitter, please go through another one of my articles, Splitter and Collection Aggregator With MuleSoft.
Now, we will walk through how the message chunk splitter and aggregator can be implemented.
Place the HTTP listener into the canvas and click to open the Properties console.
Click the green + and configure as follows:
Host: localhost.
Port: 8081.
Method:
POST
.Path: chunksplitter.
Place the message chunk splitter component in the message processor region after the HTTP listener.
Field | Description | Default value |
Display nameU |
sed to provide a unique name for the message chunk splitter in your application. |
Message chunk splitter |
Enable correlation |
Specifies whether Mule should give outgoing messages a correlation ID. Options are:
|
|
Message info mapping |
Optional. If this child element is not configured, |
|
messageSize |
Defines the size in bytes of parts in which you have to split the message. |
A message chunk splitter is useful if you have bandwidth problems (or size limitations) when using a particular transport.
Configure the splitter component as shown below.
Display name | Message chunk splitter |
Enable correlation | IF_NOT_SET |
Message size | 512 |
Finally, place the file connector at the end of message processor to save all split messages.
Testing the Application
You can use Postman to post the message. The listening URL will be http://localhost:8081/chunksplitter.
Sample input message:
<Employees>
<Employee>
<FirstName>Jitu</FirstName>
<LastName>Jain</LastName>
<Salary>100000</Salary>
</Employee>
<Employee>
<FirstName>Jospeh</FirstName>
<LastName>Adams</LastName>
<Salary>200000</Salary>
</Employee>
<Employee>
<FirstName>Brain</FirstName>
<LastName>Handscomb</LastName>
<Salary>300000</Salary>
</Employee>
<Employee>
<FirstName>Rajeev</FirstName>
<LastName>Thandani</LastName>
<Salary>500000</Salary>
</Employee>
<Employee>
<FirstName>Jitu</FirstName>
<LastName>Jain</LastName>
<Salary>100000</Salary>
</Employee>
<Employee>
<FirstName>Jospeh</FirstName>
<LastName>Adams</LastName>
<Salary>200000</Salary>
</Employee>
<Employee>
<FirstName>Steve</FirstName>
<LastName>Simon</LastName>
<Salary>300000</Salary>
</Employee>
<Employee>
<FirstName>Brain</FirstName>
<LastName>Handscomb</LastName>
<Salary>300000</Salary>
</Employee>
<Employee>
<FirstName>Rajeev</FirstName>
<LastName>Thandani</LastName>
<Salary>500000</Salary>
</Employee>
<Employee>
<FirstName>Jitu</FirstName>
<LastName>Jain</LastName>
<Salary>100000</Salary>
</Employee>
<Employee>
<FirstName>Jospeh</FirstName>
<LastName>Adams</LastName>
<Salary>200000</Salary>
</Employee>
<Employee>
<FirstName>Steve</FirstName>
<LastName>Simon</LastName>
<Salary>300000</Salary>
</Employee>
<Employee>
<FirstName>Brain</FirstName>
<LastName>Handscomb</LastName>
<Salary>300000</Salary>
</Employee>
<Employee>
<FirstName>Rajeev</FirstName>
<LastName>Thandani</LastName>
<Salary>500000</Salary>
</Employee>
</Employees>
Now, you can verify the file location and check if the message is properly split into various parts of size 512 bytes.
Code:
<?xml version="1.0" encoding="UTF-8"?>
<mule
xmlns:json="http://www.mulesoft.org/schema/mule/json"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="splitterprojectFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/chunksplitter" allowedMethods="POST" doc:name="HTTP"/>
<message-chunk-splitter messageSize="512" doc:name="Message Chunk Splitter"/>
<file:outbound-endpoint path="src/test/resources/out" responseTimeout="10000" doc:name="File"/>
</flow>
</mule>
Message Chunk Aggregator
After a splitter such as the message chunk splitter splits a message into parts, the message chunk aggregator router reassembles those parts back into a single message. The aggregator uses the message’s correlation ID to identify which parts belong to the same message.
It checks the group tag (correlation ID) of each message in a collection, selects all the messages whose group tag matches the specified value, and then combines those messages into a single message that is then sent to the next message processor in an application flow. This is particularly useful for re-assembling the segments of a long message that has been received as multiple messages, each one consisting of a segment of fixed length created and sent by the message chunk splitter.
Code:
<?xml version="1.0" encoding="UTF-8"?>
<mule
xmlns:json="http://www.mulesoft.org/schema/mule/json"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="splitterprojectFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/chunksplitter" allowedMethods="POST" doc:name="HTTP"/>
<message-chunk-splitter messageSize="512" doc:name="Message Chunk Splitter"/>
<message-chunk-aggregator failOnTimeout="true" doc:name="Message Chunk Aggregator">
<expression-message-info-mapping messageIdExpression="#[message.id]" correlationIdExpression="#[message.correlationId]"/>
</message-chunk-aggregator>
<file:outbound-endpoint path="src/test/resources/out" responseTimeout="10000" doc:name="File"/>
</flow>
</mule>
Now, you know all about the message chunk splitter and aggregator in Mulesoft!
Opinions expressed by DZone contributors are their own.
Comments