JMS: A Quick and Complete Guide
This guide will walk you through all you need to know about JMS and how it works, communicates, and connects with other software.
Join the DZone community and get the full member experience.
Join For FreeJMS is a Java-based API that allows applications to create, send, and receive messages. This provides a number of interfaces/classes to use to communicate with other messaging implementations. JMS is different from RMI, where the client and server both are tightly coupled and must be aware of each other and the remote method, but in JMS, the client and server don’t know to each other.
JMS allows various components to interact with one another efficiently, without wasting the network and other related resources.
A JMS application is composed of the following parts:
JMS provider is a messaging system that implements the JMS interfaces and provides administrative and control features.
JMS clients are the programs or components, written in Java, that produce and consume messages.
Messages are the objects that communicate information between JMS clients.
Administered objects are preconfigured JMS objects created by an administrator for the use of clients. The two kinds of administered objects are destinations and connection factories.
Note: Administrative tools allow you to bind destinations and connection factories into a Java Naming and Directory Interface (JNDI) API namespace. A JMS client can then look up the administered objects in the namespace and then establish a logical connection to the same objects through the JMS provider.
JMS provides support for both point-to-point (P2P) and publish/subscribe (Pub-Sub) domains, and some JMS clients combine the use of both domains in a single application.
P2P
This kind of application is based on message queues, senders, and receivers. Each message is addressed to a specific queue and receiver extract messages from the queue to get their messages. Queues retain all messages sent to them until the messages are consumed or until the messages expire.
Pub-Sub
In this model, the sender sends its messages to a topic. Publishers and subscribers are generally anonymous and may dynamically publish or subscribe. The system takes care of distributing the messages arriving from a topic’s multiple publishers to its multiple subscribers. Topics retain messages only as long as it takes to distribute them to current subscribers. The JMS API allows clients to create durable subscriptions. Durable subscriptions can receive messages sent while the subscribers are not active. Durable subscriptions provide the flexibility and reliability of queues, but still allow clients to send messages to many recipients.
There are two ways of message consumption:
Synchronous: In a synchronous way, we can call a receive method, which can hold until a message arrives or can time out if a message does not arrive within a specified time limit.
Asynchronous: A client can register a message listener with a consumer. Whenever a message arrives at the destination, the JMS provider delivers the message by calling the listener’s onMessage method, which basically acts on the contents of the message.
Infrastructure
Administered Objects
Two administered objects are available: Connection Factory (CF) and Destinations.
Connection factory (CF): This is an object that is getting used by clients to create a connection with a provider. CF encapsulates a connection detail that has been defined by an administrator. Each connection factory is an instance of either the QueueConnectionFactory or the TopicConnectionFactory interface.
For example:Context ctx = new InitialContext();
QueueConnectionFactory queueCF = (QueueConnectionFactory)ctx.lookup("QCF");
TopicConnectionFactory topicCF = (TopicConnectionFactory)ctx.lookup("TCF");
The above example shows that, using JNDI, we can get the InitialContext. When an InitialContext method with no parameter is getting used, then it searches in the current classpath for a vendor-specific file named jndi.properties. This file indicates which JNDI API implementation to use and which namespace to use.
Destination: This is to specify the target of messages it produces and the source of messages it consumes. In P2P, it is Queue, and in Pub-Sub, it is Topic.Topic topic = (Topic) ctx.lookup(“MyTopic");
Queue queue = (Queue) ctx.lookup("MyQueue");
Connections
A connection is basically a socket connection over TCP/IP between the client and a provider. We can have a number of sessions for a connection. Two flavors of connection are available, TopicConnection, and QueueConnection interface. To start consumption of messages, we should call the start method, and at end of the application we should close the connection using the close method.QueueConnection queueConn = queueCF.createQueueConnection();
TopicConnection topicConn = topicCF.createTopicConnection();
queueConn.start();
topicConn.start();
queueConn.close();
topicConn.close();
Session
Session is actually responsible for creating producers, consumers, and messages, and it’s a single thread context. Two flavors of Session are available: TopicSession and QueueSession.
TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
False here indicates that session is not transacted, and Session.AUTO_ACKNOWLEDGE indicates that the session automatically acknowledges when messages are received successfully.QueueSession queueSession = queueConn.createQueueSession(true, 0);
true here indicates that session is transacted and 0 indicates that message acknowledgement is not applicable here.
Producer
Session is responsible to create Producer, and Producer is responsible to send the messages to Destination (Queue/Topic). Two flavor of Producers are available, TopicPublisher, and QueueSender interface.
QueueSender queueSender = queueSession.createSender(queue);
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
After creation of producer we can send the message using below methods:
queueSender.send(message);
topicPublisher.publish(message);
We can create anonymous producer without passing the queue/topic but we can pass null as below:QueueSender queueSender = queueSession.createSender(null);
TopicPublisher topicPublisher = topicSession.createPublisher(null);
In this case we can specify the destination to send the message when we send/publish a message, and for this we need to overload the send/publish method.
Consumer
Session is responsible to create Consumer, and Consumer is responsible to receive messages from Destination (Queue/Topic). Two flavor of Consumers are available, TopicSubscriber, and QueueReceiver interface.QueueReceiver queueReceiver = queueSession.createReceiver(queue);
TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
We can create durable subscriber as below:TopicDurableSubscriber topicDurableSubscriber = topicSession.createDurableSubscriber(topic);
After creation of consumer it is being active and they can receive the message in a synchronous way as below and we can make the Subscriber inactive after calling the close method:
Message m = queueReceiver.receive();
Message m = topicSubscriber.receive(100); // specified time-out here as millisecond
Call the receiveNoWait method, which receives a message only if one is available:Message m = queueReceiver.receiveNoWait();
Note:
If you specify no arguments or an argument of 0, as receive () and receive (0) the method blocks indefinitely until a message arrives, which will consume system resources unnecessarily. So better to use timeout argument greater than 0.
We have to call start method on connection before messages send/receive.
To receive a message in an asynchronous way we have to implement MessageListener interface which have the onMessage() method which we need to define as per our message receive need when a message arrives.
We need to register the message listener with QueueReceiver or TopicSubscriber by using setMessageListener method. For e.g. if we have defined a class MyTopicListener which implements the MessageListener interface, we need to register the message listener as:MyTopicListener topicListener = new MyTopicListener();
topicSubscriber.setMessageListener(topicListener);
After registering the MessageListener we should start the connection using start method, before registering the MessageListener if we call the start method on connection there would be a possibility to miss the messages.
Whenever a message is delivered, onMessage method would be called due to MessageListener. Same message listener can be registered by QueueReceiver as well as TopicSubscriber.
Remember: Only one MessageListener may be running at a time for a session.
If MessageConsumer is interested in some specific messages, then we should apply some filter on messages. This filter is achieved by MessageSelector, which actually performs content-based routing. MessageSelector assigns the work of message filtering to the JMS provider rather than to application. Basically, MessageSelector is a string that contains an expression that is getting applied to message headers and message properties. Methods createReceiver, createSubscriber, and createDurableSubscriber have a definition to pass message selector as an argument when we create a message consumer. After that message, the consumer will receive only those messages whose headers and properties match the selector.
Note: Message Selector can not create a filter rule based on the message body — it always apply on message-headers and message-properties.
Message
This is the actual content which is getting passed from sender to receiver. A message has three parts, as below.
Message header: These are all predefined, and few of them can be set by clients for identification purposes and routing the messages. It can be used by Message Selector as well. Below are the message headers and which level they are getting set:
JMSDestination send/publish method
JMSDeliveryMode send/publish method
JMSExpiration send/publish method
JMSPriority send/publish method
JMSMessageID send/publish method
JMSTimestamp send/publish method
JMSCorrelationID Client
JMSReplyTo Client
JMSType Client
JMSRedelivered JMS provider
Message properties (optional): This can be used for the Message Selector. For example, as below:
message.setStringProperty("myMessageSelector", “contains102”);
Message body (optional): There are six types as below:
- TextMessage: A java.lang.String object.
- MapMessage: A set of key-value pairs.
- BytesMessage: A stream of uninterpreted bytes.
- StreamMessage: A stream of primitive values in the Java programming language filled and read sequentially.
- ObjectMessage: A Serializable object in the Java programming language.
- Message: This message type is useful when a message body is not required.
Let us see the example as below:
Sender Side:
TextMessage msg = queueSession.createTextMessage();
msg.setText(“My first text message”);
queueSender.send(msg);
Receiver Side:
Message msg = queueReceiver.receive();
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("Got message ---- " + message.getText());
}
Exceptions
Possible JMS exceptions that may occur are are below:
IllegalStateException: This exception is thrown when a method is invoked at an illegal or inappropriate time or if the provider is not in an appropriate state for the requested operation. For example, this exception must be thrown when a domain inappropriate method is called, such as
TopicSession.CreateQueueBrowser.
InvalidClientIDException: This exception must be thrown when a client attempts to set a connection's client ID to a value that is rejected by a provider.
InvalidDestinationException: This exception must be thrown when a destination either is not understood by a provider or is no longer valid.
InvalidSelectorException: This exception must be thrown when a JMS client attempts to give a provider a message selector with invalid syntax.
JMSSecurityException: This exception must be thrown when a provider rejects a user name/password submitted by a client. It may also be thrown for any case where a security restriction prevents a method from completing.
MessageEOFException: This exception must be thrown when an unexpected end of stream has been reached when a
StreamMessage
orBytesMessage
is being read.MessageFormatException: This exception must be thrown when a JMS client attempts to use a data type not supported by a message or attempts to read data in a message as the wrong type. It must also be thrown when equivalent type errors are made with message property values.
MessageNotReadableException: This exception must be thrown when a JMS client attempts to read a write-only message.
MessageNotWriteableException: This exception must be thrown when a JMS client attempts to write to a read-only message.
ResourceAllocationException: This exception is thrown when a provider is unable to allocate the resources required by a method. For example, this exception should be thrown when a call to
TopicConnectionFactory.createTopicConnection
fails due to a lack of JMS provider resources.TransactionInProgressException: This exception is thrown when an operation is invalid because a transaction is in progress. For instance, an attempt to call
Session.commit
when a session is part of a distributed transaction should throw aTransactionInProgressException
.TransactionRolledBackException: This exception must be thrown when a call to
Session.commit
results in a rollback of the current transaction.
Let's understand the key factors for better messaging application development.
Message Acknowledgement: In any transacted sessions, acknowledgment happens automatically when a transaction is committed and if a transaction is rolled-back, all consumed messages are redelivered. In non-transacted sessions, acknowledgement depends on the value specified as the second argument of the createQueueSession or createTopicSession method. Three possible values as below:
Session.AUTO_ACKNOWLEDGE. Here session automatically acknowledges a client’s receipt of a message either when the client has successfully returned from a call to receive or when the MessageListener it has called to process the message returns successfully.
Session.CLIENT_ACKNOWLEDGE. A client acknowledges a message.
Session.DUPS_OK_ACKNOWLEDGE. Here the session to lazy acknowledge the messages. This is to result in the delivery of some duplicate messages in case JMS provider fails, so it should be used only by consumers that can tolerate duplicate messages.
Message persistence: There are two ways to handle the persistence level of messages:
PERSISTENT delivery mode directs the JMS provider to ensure that a message is not lost in case of a JMS provider failure. A message sent with this delivery mode is logged to stable storage when it is sent.
NON_PERSISTENT delivery mode does not require the JMS provider to store the message and therefore no assurance that it is not lost if the provider fails.
Durable subscribers: TopicSession.createSubscriber method creates a non-durable subscriber and they can receive only messages that are published while it is active. Whereas the TopicSession.createDurableSubscriber method creates a durable subscriber with extra cost, and it can have only one active subscriber at a time. A durable subscriber registers with a unique identity that is retained by the JMS provider, and subsequent subscriber objects with the same unique identity resume in the same state in which it was left by the previous subscriber. If a durable subscription has no active subscriber, the JMS provider retains the messages until they are received by the subscription or until they expire.
Now let's dive into the various examples in both P2P and Pub-Sub implementations.
P2P Implementation
The sender (producer) side follows the steps below:
JNDI lookup of the QueueConnectionFactory and queue.
Creating a connection and a session.
Creating QueueSender.
Creating TextMessage.
Sending messages to the queue.
Closing the connection.
The receiver side would then follow the following steps:
JNDI lookup of the QueueConnectionFactory and queueCreating connection and a session.
Creating QueueReceiver.
Starting connection.
Receiving the messages from queue.
Closing the connection.
MyQueueSender.java
import javax.jms.*;
import javax.naming.*;
import exper.test.jms.pubSub.MyMessageListener;
public class MyQueueSender {
private static final String Q_CONNECTION_FACTORY = "QueueConnectionFactory";
private static final String Q_NAME = "myQyeue";
private static Context jndiContext = null;
private static QueueConnectionFactory queueConnectionFactory = null;
public static void main(String... args) {
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueSender queueSender = null;
TextMessage message = null;
final int NUM_MSGS;
try {
jndiContext = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(Q_CONNECTION_FACTORY);
queue = (Queue) jndiContext.lookup(Q_NAME);
}catch (NamingException e) {
System.out.println("Could not create JNDI context: " + e.getMessage());
System.exit(1);
}
try {
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueSender = queueSession.createSender(queue);
message = queueSession.createTextMessage();
for (int i = 0; i < 10; i++) {
message.setText("Test Message " + ++i);
System.out.println("message to send: " + message.getText());
queueSender.send(message);
}
} catch (JMSException e) {
System.out.println(e.getMessage());
} finally {
if (null != queueConnection) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
}
MyQueueReceiver.java
import javax.jms.*;
import javax.naming.*;
public class MyQueueReceiver {
private static final String Q_CONNECTION_FACTORY = "QueueConnectionFactory";
private static final String Q_NAME = "myQyeue";
private static Context jndiContext = null;
private static QueueConnectionFactory queueConnectionFactory = null;
public static void main(String... args) {
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;
try {
jndiContext = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(Q_CONNECTION_FACTORY);
queue = (Queue) jndiContext.lookup(Q_NAME);
} catch (NamingException e) {
System.out.println("Not able to create JNDI context: " + e.getMessage());
System.exit(1);
}
try {
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
while (true) {
Message m = queueReceiver.receive(1);
if (null != m && m instanceof TextMessage) {
message = (TextMessage) m;
System.out.println("Message read: " + message.getText());
}
}
} catch (JMSException jmse) {
System.out.println(jmse.getMessage());
} finally {
if (null != queueConnection) {
try {
queueConnection.close();
} catch (JMSException jmse) {}
}
}
}
}
Pub-Sub Implementation
The sender (producer) side uses the following steps:
JNDI lookup of the TopicConnectionFactory and topic.
Creating a connection and a session.
Creating a TopicPublisher.
Creating TextMessage.
Publishing messages to the topic.
Closes the connection, which automatically closes the session and Topic-Publisher.
Receiver side below steps would be taken:
JNDI lookup of the TopicConnectionFactory and topic.
Creating a connection and a session.
Creating a TopicSubscriber.
Creating an instance of the customized listener class and registers it as the message-listener for the TopicSubscriber.
Starting the connection.
Listening for the messages published to the topic.
Closing the connection, to close TopicSubscriber.
Message-Listener’s responsibility: Whenever a message comes, the onMessage method is getting called automatically. The onMessage method converts the incoming message to a specific type as TextMessage/MapMessage etc and displays its content.
MyTopicPublisher.java
import javax.jms.*;
import javax.naming.*;
public class MyTopicPublisher {
private static final String TOPIC_CONN_FACTORY = "TopicConnectionFactory";
private static final String TOPIC_NAME = "myTopic";
private static Context jndiContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
public static void main(String[] args) {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
TextMessage message = null;
try {
jndiContext = new InitialContext();
topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup(TOPIC_CONN_FACTORY);
topic = (Topic) jndiContext.lookup(TOPIC_NAME);
} catch (NamingException e) {
System.out.println("Not able to create JNDI context: " + e.toString());
System.exit(1);
}
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < 10; i++) {
message.setText("Test Message " + ++i);
System.out.println("Message getting published: " + message.getText());
topicPublisher.publish(message);
}
} catch (JMSException e) {
System.out.println(e.getMessage());
} finally {
if (null != topicConnection) {
try {
topicConnection.close();
} catch (JMSException e) {}
}
}
}
}
MyTopicSubscriber.java
import javax.jms.*;
import javax.naming.*;
import java.io.*;
public class MyTopicSubscriber {
private static final String TOPIC_CONN_FACTORY = "TopicConnectionFactory";
private static final String TOPIC_NAME = "myTopic";
private static Context jndiContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
public static void main(String[] args) {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
MyMessageListener topicListener = null;
TextMessage message = null;
InputStreamReader inputStreamReader = null;
try {
jndiContext = new InitialContext();
topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup(TOPIC_CONN_FACTORY);
topic = (Topic) jndiContext.lookup(TOPIC_NAME);
} catch (NamingException e) {
System.out.println("Not able to create JNDI context: " + e.toString());
System.exit(1);
}
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicListener = new MyMessageListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
} catch (JMSException jmse) {
System.out.println(jmse.getMessage());
} finally {
if (null != topicConnection) {
try {
topicConnection.close();
} catch (JMSException e) {}
}
}
}
}
MyMessageListener.java
import javax.jms.*;
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage msg = null;
try {
if (message instanceof TextMessage) {
msg = (TextMessage) message;
System.out.println("Received message: " + msg.getText());
} else {
System.out.println("Unexpected type of message: " + message.getClass().getName());
}
} catch (JMSException jmse) {
System.out.println("Exception occurs in onMessage(): " + jmse.getMessage());
}
}
}
What Is Queue Browser?
A queue browser can be used to look at a queue without consuming any messages. QueueBrowser provides enumerators, which can be used to enumerate the messages in a queue. It is a kind of snapshot so messages are added to the queue while browsing; this will not be available to the queue browser.
Example: MyQueueBrowser.java
import java.util.Enumeration;
import javax.jms.*;
import javax.naming.*;
public class MyQueueBrowser {
private static final String Q_CONNECTION_FACTORY = "QueueConnectionFactory";
private static final String Q_NAME = "myQyeue";
public static void main(String[] args) throws Exception {
browseQ();
}
public static void browseQ() throws Exception {
InitialContext jndiContext = new InitialContext();
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory) jndiContext.lookup(Q_CONNECTION_FACTORY);
Queue queue = (Queue) jndiContext.lookup(Q_NAME);
QueueConnection queueConn = queueConnFactory.createQueueConnection();
// in case of QueueBrowser Session.AUTO_ACKNOWLEDGE does not make any sense
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser queueBrowser = queueSession.createBrowser(queue);
queueConn.start();
Enumeration<Message> e = queueBrowser.getEnumeration();
int msgCount = 0;
while (e.hasMoreElements()) {
Message message = e.nextElement();
msgCount++;
}
System.out.println(queue + " has " + msgCount + " messages");
queueConn.close();
}
}
I hope this will give a good understanding about JMS.
Opinions expressed by DZone contributors are their own.
Comments