Mule ESB and Microsoft Azure Service Bus
This article outlines a few ways of integrating Mule ESB with the MS Azure Service Bus Queues.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
This article outlines a few ways of integrating Mule ESB with the MS Azure Service Bus Queues.
At the time of compiling this article, the following are the MS Azure cloud offerings with respect to the queues:
1. Azure Storage Queues
- Azure Queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS
- It’s an “Azure Storage Infrastructure” cloud offering
2. Azure Service Bus*
- Microsoft Azure Service Bus is a reliable information delivery service. When two or more parties want to exchange information, they need a communication facilitator. Service Bus is a brokered, or third-party communication mechanism
- It’s an “Azure Integration Space” cloud offering
Since it is an “integration space” Azure cloud queue offering, here are few ways in which we can leverage Azure Service Bus with the Mule ESB for "Writing a message."
As always for queues, the following things are in play:
- Writing a message to the queue.
- Reading a message from the queue. (Queue readers can be independent of ESB. So this does not require Mule ESB as shown below)
3. Writing a Message to the Queue
- Using Azure Java SDK
- Using Azure Service Bus REST APIs
3. Using Mule Azure Service Bus Connector ( Not covered in this article)
4. Reading a Message From the Queue (Does Not Require Mule ESB at All)
- “Classic way” using Azure Java SDK — A Simple Java Class, which registers a callback
- Azure function Apps (“Uber Cool” way)
5. A Few Things to Consider
- Examples are compiled and executed in the Anypoint Studio Version 6.3.0 with Java 8
- May require free/pay-as-go/premium subscription for Azure Function Apps, Azure SQL Server, and Azure Service bus and queues
- The details for the following are omitted for brevity and out of scope for this article:
- How to create Function apps and configure them.
- How to procure and set-up Azure SQL Server
- How to create an Azure service bus
- Set up Azure Subscriptions and Resource groups
- How to write flows in Anypoint Studio and Mule flows, basics on how to use HTTP and Java components, and how to deploy flows in the Studio integrated Mule runtime.
Implementation — Writing a Message
1.1 Writing a Message — Using Azure Java SDK
How to: We can integrate using Mule “Java Component” in any flow.
High-level approach:
Add Azure Java SDK jars as a maven dependency
2 Types of Implementations:
Sync
Async (using Java Futures)
Mule Design View:
Configuration XML: (Leverage Mule Java Component — Sync)
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="mule-azure-service-bus-java-compFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/java" doc:name="HTTP" allowedMethods="POST"/>
<logger level="INFO" doc:name="Logger" message="Sending Message"/>
<component class="com.utility.message.azure.MessageSender" doc:name="Java"/>
<logger message="Send Message - Java - Success" level="INFO" doc:name="Logger"/>
</flow>
Configuration XML: (Leverage Mule Java Component — Async)
<flow name="mule-azure-service-bus-java-threaded">
<http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/java/threaded" allowedMethods="POST" doc:name="HTTP"/>
<logger level="INFO" doc:name="Logger" message="Recieving Message"/>
<component class="com.utility.message.azure.MessageSenderThread" doc:name="Java"/>
<logger level="INFO" doc:name="Logger" message="Send Message - java threaded - Success"/>
</flow>
Source Code for Java Mule Component — Sync
package com.utility.message.azure;
import java.io.UnsupportedEncodingException;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.springframework.beans.factory.annotation.Value;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
public class MessageSender implements Callable {
@Value("${servicebus.connstring}")
private String connectionString;
@Value("${servicebus.queue}")
private String queueName;
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
// Create a QueueClient instance for sending and then asynchronously send messages.
// Close the sender once the send operation is complete.
QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
sendMessage(sendClient,eventContext.getMessageAsString(),eventContext.getSession().getId());
return "true";
}
private void sendMessage(QueueClient sendClient,String msg,String msgID) throws UnsupportedEncodingException, InterruptedException, ServiceBusException {
Message message = new Message(msg.getBytes("UTF-8"));
message.setContentType("application/json");
message.setLabel(msgID);
message.setMessageId(msgID);
sendClient.send(message);
}
}
Source Code for Java Mule Component — Async
package com.utility.message.azure;
import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.springframework.beans.factory.annotation.Value;
public class MessageSenderThread implements Callable {
@Value("${servicebus.connstring}")
private String connectionString;
@Value("${servicebus.queue}")
private String queueName;
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
// Create a QueueClient instance for sending and then asynchronously send messages.
// Close the sender once the send operation is complete.
QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
this.sendMessagesAsync(sendClient,eventContext.getMessageAsString(),eventContext.getSession().getId()).thenRunAsync(() -> sendClient.closeAsync());
return "true";
}
CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient,String incomingMessage,String messageID) throws UnsupportedEncodingException {
List<CompletableFuture> tasks = new ArrayList<>();
final String messageId = "222";
Message message = new Message(incomingMessage.getBytes("UTF-8"));
message.setContentType("application/json");
message.setLabel(messageID);
message.setMessageId(messageId);
//message.setTimeToLive(Duration.ofMinutes(2));
System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
}));
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
}
Mule Application Properties (config.properties)
servicebus.connstring=Endpoint=sb://<your-queue-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-key-name>;SharedAccessKey=<your-key-value>
servicebus.queue=<your-queue-name>
servicebus.url=https://<your-queue-namespace>.servicebus.windows.net
servicebus.keyname=<your-key-name>
servicebus.accesskey=<your-key-value>
Substitute your own Azure Service Bus credentials in the following places:
<your-queue-namespace> , <your-key-name>, <your-key-value> and <your-queue-name>
With everything in place, run the above Mule Application.
Fire up your favorite rest client and hit the endpoint. In Postman, it may look like this.
Sync
Async
Check in the MS Azure Service Bus Portal.
1.2 Writing a Message — Using Azure Service Bus REST APIs
How to: We can integrate using mule “Http Connector” and "Java component" in any flow.
High-level approach:
Add Azure Java SDK jars as a maven dependency for generating Azure Tokens using Java class.
Call Azure Token Generator class from the mule Java Component.
Mule Design View:
Configuration XML: (Leverage Mule Java Component and Http Connector)
<flow name="mule-azure-service-bus-http">
<http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/http" allowedMethods="POST" doc:name="HTTP"/>
<logger level="INFO" doc:name="Logger" message="Recieving Message"/>
<set-variable variableName="inputPayload" value="#[payload]" doc:name="Variable" encoding="UTF-8" mimeType="application/json"/>
<component class="com.utility.connectivity.azure.AzureTokenComponent" doc:name="Java"/>
<set-property propertyName="Authorization" value="#[payload]" doc:name="Set Auth Header"/>
<set-variable variableName="messageID" value="#[new java.util.Random().nextInt(100)]" doc:name="Variable"/>
<set-property propertyName="BrokerProperties" value="{"Label":#[flowVars.messageID],"TimeToLiveTimeSpan":""}" doc:name="Property"/>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
flowVars.inputPayload]]></dw:set-payload>
</dw:transform-message>
<http:request config-ref="HTTP_Request_Configuration" path="/testq/messages" method="POST" doc:name="HTTP"/>
<logger level="INFO" doc:name="Logger" message="Send Message - http - Success"/>
</flow>
Source Code for Java Azure Token Generator
package com.utility.message.azure.utility;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
public class AzureTokenGenerator {
public static String generateToken(String resourceUri, String keyName, String key)
{
long epoch = System.currentTimeMillis()/1000L;
int week = 60*60*24*7;
String expiry = Long.toString(epoch + week);
String sasToken = null;
try {
String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
String signature = getHMAC256(key, stringToSign);
sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") +"&sig=" +
URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
System.out.println("SAS Token:"+sasToken);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return sasToken;
}
public static String getHMAC256(String key, String input) {
Mac sha256_HMAC = null;
String hash = null;
try {
sha256_HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
sha256_HMAC.init(secret_key);
java.util.Base64.Encoder encoder = Base64.getEncoder();
hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return hash;
}
public static void main(String args[]) {
generateToken("<your-queue-uri>","<your-key-name>","<your-key-value>");
}
}
To unit-test the java class, substitute your own Azure Service Bus credentials in the following places:
<your-queue-uri>, <your-key-name>, <your-key-value>
Source Code for Java Mule Component
package com.utility.connectivity.azure;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.springframework.beans.factory.annotation.Value;
import com.utility.message.azure.utility.AzureTokenGenerator;;
public class AzureTokenComponent implements Callable {
@Value("${servicebus.url}")
private String aurl;
@Value("${servicebus.keyname}")
private String akeyname;
@Value("${servicebus.accesskey}")
private String akey;
@Override
public String onCall(MuleEventContext eventContext) throws Exception {
// TODO Auto-generated method stub
String authHeader = AzureTokenGenerator.generateToken(aurl,akeyname,akey);
return authHeader;
}
}
With everything in place, run the above Mule Application.
Fire up your favorite rest client and hit the endpoint. In Postman, it may look like this.
Verify if the message is posted in the MS Azure Service Bus Portal.
Implementation — Reading a Message
1.1 “Classic Way” Using Azure Java SDK — A Java Class
How to: We can read queues using simple Java program which registers a callback.
The callback will run where there is a message on the queue.
High-level approach:
Add Azure Java SDK jars , Apache Commons CLI, SL4J libraries as maven dependencies.
Register a callback for the Azure queue client.
Source Code for Java Queue Reader
package com.azure.util.queue;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
public class AzureQueueReader {
static final Logger logger = LoggerFactory.getLogger(AzureQueueReader.class);
String ConnectionString = null;
String QueueName = null;
public void run() throws Exception {
// Create a QueueClient instance for receiving using the connection string builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the queue
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
this.registerReceiver(receiveClient);
// wait for ENTER or 10 seconds elapsing
waitForEnter(10);
// shut down receiver to close the receive loop
receiveClient.close();
}
void registerReceiver(QueueClient queueClient) throws Exception {
// register the RegisterMessageHandler callback
queueClient.registerMessageHandler(new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
// if (message.getLabel() != null &&
// message.getContentType() != null) {
byte[] body = message.getBody();
String msgBody = new String(body);
System.out.printf(
"\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tLabel = \"%s\", \n\t\t\t\t\t\t Body = \"%s\"",
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
message.getLabel(),
msgBody
);
// }
return CompletableFuture.completedFuture(null);
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
}
public static void main(String[] args) {
AzureQueueReader app = new AzureQueueReader();
try {
app.runApp(args);
app.run();
} catch (Exception e) {
System.out.printf("%s", e.toString());
}
// System.exit(0);
}
public void runApp(String[] args) {
try {
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
options.addOption(new Option("q", true, "Queue Name"));
CommandLineParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
if (cl.getOptionValue("c") != null && cl.getOptionValue("q") != null) {
ConnectionString = cl.getOptionValue("c");
QueueName = cl.getOptionValue("q");
}
else
{
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
}
} catch (Exception e) {
System.out.printf("%s", e.toString());
}
}
private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}
With everything in place - Run the Java application from the command line:
This java program requires 2 arguments for command line execution.
c - Connection String
q - Queue Name
In Unix like systems, here is the command to run:
Copy all dependencies required for the Java class in the lib folder
java -cp "lib/*:AzureServiceBusUtil-0.0.1-SNAPSHOT.jar" com.azure.util.queue.AzureQueueReader -c "<your-queue-conn-string>" -q "<your-queue-name>"
- <your-queue-conn-string> Generally it will be in the format like "Endpoint=sb://<your-queue-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-key-name>;SharedAccessKey=<your-key-value>"
Substitute your own credentials for the following:
<your-queue-namespace>, <your-key-name> and <your-key-value>
- <your-queue-name>Generally it will be in the format like "<your-queue-name>"
Fire up your favorite rest client and Pump a message to queue. In postman it may look like this.
Check the logs of the Java application:
It should print a message with statement "Message received"
1.2 Azure Function Apps (“Uber Cool” Way)
- At the time of writing this article, I could not find native support in "Function Apps" using Java, which can integrate with Azure Service Bus.
- So instead of using Java, I used node.js code to demonstrate how can we "read a message from a queue" and put into Cloud SQL Server Database.
- What this function app does is the following:
- Reads a message from the Azure Service Bus Queue (it is configured as trigger)
- Writes a message into Azure SQL Server table.
Azure Function App
Function App "Integrate" Tab Settings
Check the Trigger is configured as Azure Queue Connection.
Source Code for Queue Reader in "Azure Function App" using node.js.
var Connection = require('tedious').Connection;
var Request = require('tedious').Request
var TYPES = require('tedious').TYPES;
module.exports = function (context, myQueueItem) {
context.log('JavaScript queue trigger function processed work item:', myQueueItem);
context.log('Correlation Data:', myQueueItem.correlationData);
var _currentData = {};
var config = {
userName: '<your-uname>',
password: '<your-password>',
server: '<your-azure-db-host>',
options: {encrypt: true, database: 'trialdb'}
};
var connection = new Connection(config);
connection.on('connect', function(err) {
context.log("Connected");
insertStatement(myQueueItem.correlationData);
});
function selectQuery() {
request = new Request("SELECT * FROM dbo.s_table;", function(err) {
if (err) {
context.log(err);}
});
request.on('row', function(columns) {
_currentData.id = columns[0].value;
_currentData.payload = columns[1].value;;
context.log(_currentData);
});
request.on('requestCompleted', function () {
connection.execSql(request);
});
}
function insertStatement(payload) {
context.log('Insert statement started');
context.log('Payload'+payload);
request = new Request("INSERT INTO DBO.S_TABLE(payload) VALUES(@payl);",function(err) {
if (err) {
context.log(err);}
});
request.addParameter('payl', TYPES.NVarChar,payload);
request.on('row', function(columns) {
_currentData.id = columns[0].value;
context.log(_currentData);
});
connection.execSql(request);
context.log('Insert statement completed successfully');
}
context.done();
};
Substitute these values with your own Azure credentials.
<your-uname>, <your-password> , <your-azure-db-host>
With everything in place, let's test this out!
Let's pump a message into the queue using REST client. If it is Postman, it may be like below:
Check the logs of Azure Function App for message arrival.
Check the Azure SQL Server table.
So, this wraps up this article for both reading and writing messages.
Mahalo
Opinions expressed by DZone contributors are their own.
Comments