JMS Clustering by Example
Join the DZone community and get the full member experience.
Join For FreeIt's amazing how the JBoss Team put together an easy way to do JMS Clustering, out of the box!!.
I'll start with an easy example, creating a Queue named "MyClusteredQueue".
In this example I'm using JBoss AS 5.1. and two computers connected on the same network, with these IP's:
- Computer A: 192.168.0.143
- Computer B: 192.168.0.210
So, here are the steps:
1) Install the JBoss on both computers. We are going to use the "all" configuration for both computers.
2) We create our Queue on both servers.
Go to $JBOSS_HOME/server/all/deploy/messaging/ and edit the destinations-service.xml file. Add the MyClusteredQueue before the last server tag. It looks like this:
<!-- Cluster JMS -->
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=MyClusteredQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>
This is how you add a Queue to the JBoss, and the people how are familiar with this, the only new thing is to add the attribute "Clustered". This step must be set on both computers. At the end of the article you can find the files.
3) Write the MDB to consume the messages, and deploy it on the two computers. (I'm using an EJB 3 - MDB style).
import java.net.InetAddress;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.apache.log4j.Logger;
/**
* @author felipeg
*
*/
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue")
})
public class JMSClusterClientHandler implements MessageListener {
Logger log = Logger.getLogger(JMSClusterClientHandler.class);
@Override
public void onMessage(Message message) {
try{
if (message instanceof ObjectMessage)
{
InetAddress addr = InetAddress.getLocalHost();
log.info("########## Processing Host: " + addr.getHostName() + " ##########" );
ObjectMessage objMessage = (ObjectMessage) message;
Object obj = objMessage.getObject();
log.info("Object received:" + obj.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4) Start the jboss with the following options:
Computer A:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.143 -Djboss.messaging.ServerPeerID=1
Computer B:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.210 -Djboss.messaging.ServerPeerID=2
It is necesary to give an ID to each server and this is accomplished with this directive:
-Djboss.messaging.ServerPeerID
When you start the jboss on computer A, you should see the logs (server.log) telling you that there is one node ready and listening, and once you start the jboss on computer B, on the log will appear the two nodes, the two IP's ready to consume messages.
Also on the jndi.properties (if you are using the default InitialContext) file it's necessary to add the two computers ip's separated by comma to the java.naming.provider.url property. (In my case a create a Properties variable and I set all the necessary properties, JMSDispatcher.java - see the code below).
<html>
<body>
<div>
<form method="POST" action='<%= request.getRequestURI() + "JMSClusteredClient" %>'>
<fieldset>
<legend>JMS Clustered - Test Client</legend>
<table>
<tr>
<td>Server:</td><td><input type="text" name="server" value="192.168.0.143:1099,192.168.0.210:1099" /></td>
</tr>
<tr>
<td>
<select name="messageType">
<option value="QUEUE" selected="selected">Queue</option>
<option value="TOPIC" >Topic</option>
</select>
</td>
<td><input type="text" name="topicqueue" value="queue/MyClusteredQueue" /></td>
</tr>
<tr>
<td>Times:</td><td><input type="text" name="times" value="3" /></td>
</tr>
<tr>
<td>Message:</td><td><textarea rows="3" cols="20" name="message"></textarea></td>
</tr>
</table>
<input type="submit" value="Send">
</fieldset>
</form>
</div>
</body>
</html>
public class JMSClusteredClient extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response)
*/
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();
String topicqueue = request.getParameter("topicqueue");
String message = request.getParameter("message");
String server = request.getParameter("server");
String messageType = request.getParameter("messageType");
String times = request.getParameter("times");
int intTimes = Integer.parseInt(times);
JMSDispatcher dispatcher = new JMSDispatcher();
dispatcher.setTopicQueueName(topicqueue);
dispatcher.setServer(server);
dispatcher.setMessageType(messageType);
try {
for(int count =1; count <= intTimes;count++){
dispatcher.sendMessage( count + " of " + times + " " + message);
}
out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times.");
} catch (JMSException e) {
e.printStackTrace();
out.println("Error:" + e.getMessage());
} catch (NamingException e) {
out.println("Error:" + e.getMessage());
e.printStackTrace();
} finally{
out.close();
}
}
}
public class JMSDispatcher {
/**
*
*/
private static final long serialVersionUID = 7105145023422143880L;
private static Logger log = Logger.getLogger(JMSDispatcher.class);
private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory";
private final String CONNECTION_FACTORY = "ConnectionFactory";
private final String TOPIC = "TOPIC";
private final String QUEUE = "QUEUE";
private String topicQueueName;
private String server;
private String messageType;
public void setTopicQueueName(String value){
this.topicQueueName = value;
}
public void setServer(String value){
this.server = value;
}
public void setMessageType(String value){
this.messageType = value;
}
public void sendMessage(Object objectMessage) throws JMSException, NamingException{
log.debug("##### Setting up a Queue/Topic Message: #####");
if (TOPIC.equals(messageType)){
sendTopicMessage(objectMessage);
} else if (QUEUE.equals(messageType)){
sendQueueMessage(objectMessage);
}
log.debug("##### Publishing Message: Done #####");
}
private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{
try{
InitialContext initialContext = getInitialContext();
QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
QueueConnection queueConn = qcf.createQueueConnection();
Queue queue = (Queue) initialContext.lookup(topicQueueName);
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConn.start();
QueueSender send = queueSession.createSender(queue);
ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Queue: " + queueName + "#####");
send.send(om);
send.close();
queueConn.stop();
queueSession.close();
queueConn.close();
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}
}
private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{
try{
InitialContext initialContext = getInitialContext();
TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
TopicConnection topicConn = tcf.createTopicConnection();
Topic topic = (Topic) initialContext.lookup(topicQueueName);
TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
topicConn.start();
TopicPublisher send = topicSession.createPublisher(topic);
ObjectMessage om = topicSession.createObjectMessage();
om.setObject((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Topic: " + topicName + "#####");
send.publish(om);
send.close();
topicConn.stop();
topicSession.close();
topicConn.close();
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}
}
private InitialContext getInitialContext() throws NamingException{
Properties jboss = new Properties();
jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
jboss.put("java.naming.provider.url", server);
return new InitialContext(jboss);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
<display-name>JMSWeb</display-name>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>
<servlet>
<description></description>
<display-name>JMSClusteredClient</display-name>
<servlet-name>JMSClusteredClient</servlet-name>
<servlet-class>com.blogspot.felipeg48.jms.web.JMSClusteredClient</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>JMSClusteredClient</servlet-name>
<url-pattern>/JMSClusteredClient</url-pattern>
</servlet-mapping>
</web-app>
Opinions expressed by DZone contributors are their own.
Comments