Getting Started With JMS-ActiveMQ: Explained in a Simple Way
Want to learn Java Message Service using ActiveMQ? Here is the article to explain the basics with simple examples and setup.
Join the DZone community and get the full member experience.
Join For FreeJava Message Service (JMS) is a very useful open source that works as a message-oriented middleware.
About JMS
- The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages.
- It enables distributed communication that is loosely coupled, reliable, and asynchronous.
- Messaging is a technique to communicate with different software components and/or applications.
- JMS, a messaging service, is mainly used to send and receive messages from one application to another.
- There are different JMS brokers/providers like ActiveMQ, RabbitMQ, OpenMQ, etc.
- I will be using ActiveMQ for this tutorial.
About ActiveMQ
- ActiveMQ is a messaging service that facilitates disparate data at scale in enterprise systems.
- ActiveMQ is a popular open-source messaging service that is built in Java.
- It works as a message-oriented middleware or MoM for short.
Setting Up ActiveMQ
- Java must be installed and set up
(
JAVA_HOME and PATH variables)
. I am using Java 8 for this tutorial. - Download ActiveMQ (we will use "Classic" for this tutorial)
- Extract the compressed file in the folder of your choice and go to the folder of your OS (win64 for Windows 10/11, Linux-x86-64 for Linux, and MacOSX for Mac) under the bin of the main folder extracted apache-ActiveMQ-5.16.3 (5.16.3 is the classic version of ActiveMQ which I used for writing this tutorial). So I am under apache-ActiveMQ-5.16.3/bin/MacOSX for my mac.
- Open the Command Prompt/Shell and execute the command ActiveMQ start to start ActiveMQ JMS Server.
- You could see the output similar to below and let it run.
- In the browser, open the URL http://localhost:8161/admin
- Provide the admin/admin as Username and Password if prompt.
- After providing the Username/password, the screen below will appear
And upon clicking on the "Queues" tab in the menu, we could see a screen like below showing the queue created (black as I haven't created any yet; I will create a few as we go further on the tutorial). Here we could also give a queue name and press on create button to create a JMS queue.
Setup
I am using Eclipse to demonstrate examples for this article. Let's create a maven java project with the below dependency:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
Messaging Domains
Point-To-Point (P2P Messaging)
- A message can be delivered to one receiver only. Here, Queue is used as a message-oriented middleware (MOM).
The messages stay in the queue until received and processed.
There is no timing-related dependency between the sender and receiver. Sent messages could be retrieved without any expiry.
Publisher/Subscriber (Pub/Sub Messaging)
- A message can be delivered to all of its subscribers.
- We could also use a Topic as a message-oriented middleware that is responsible for holding and delivering messages.
- There is a timing dependency between the publisher and the subscriber.
JMS Publisher/Subscriber Queue-based messaging
Queue vs. Topic
Topics
- In JMS, a Topic implements publish and subscribe semantics.
- When you publish a message, it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message.
- Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
Queues
- A JMS Queue implements load balancer semantics.
- A single message will be received by exactly one consumer.
- If there are no consumers available at the time the message is sent, it will be kept until a consumer is available that can process the message.
- If a consumer receives a message and does not acknowledge it before closing, then the message will be redelivered to another consumer.
- A queue can have many consumers with messages loaded and balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
JMS Publisher/Subscriber Topic base Messaging
Without making anything complicated here, to create a queue programmatically (via ActiveMQ), we need to:
For P2P Messaging
- Create a connection factory with the URL of the JMS Server.
- Create a connection via connection-factory.
- Start the connection.
- Create a session on the connection.
- Create a queue with queue-name.
- Hold the queue's destination object.
- Create a producer object to send messages to the queue.
- Create a consumer object to receive messages from the queue.
For Pub/Sub Messaging (Without Topic)
1st 7 steps are the same.
8. Can create multiple consumers for the queue and set dedicated listeners for each consumer. These listeners used to receive and process messages.
For Pub/Sub Messaging (With a Topic)
1st 5 steps are the same.
6. Create a topic with the topic name.
7. Create a producer object to send a message for the topic.
8. Create multiple consumers for the topic and set dedicated listeners for each consumer. These listeners used to receive and process messages.
Now, let's start creating a simple queue for sending and receiving text messages. I wrote a class for creating a queue. The class also offers a few methods; send()
to send a text message to the queue, receive()
to receive a text message from the queue, and close()
to close the queue and its associated objects line connection, session, etc.
Simple Example of ActiveMQ To Start
As a first example, I am using P2P messaging.
Code for SimpleQueue class:
package org.trishinfotech.activemq.example1;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SimpleQueue {
private static final String CLIENTID = "TrishInfotechActiveMQ";
private String queueName;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private MessageConsumer consumer;
public SimpleQueue(String queueName) throws Exception {
super();
// The name of the queue.
this.queueName = queueName;
// URL of the JMS server is required to create connection factory.
// DEFAULT_BROKER_URL is : tcp://localhost:61616 and is indicates that JMS
// server is running on localhost
connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// Getting JMS connection from the server and starting it
connection = connectionFactory.createConnection();
connection.setClientID(CLIENTID);
connection.start();
// Creating a non-transactional session to send/receive JMS message.
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Destination represents here our queue ’MyFirstActiveMQ’ on the JMS
// server.
// The queue will be created automatically on the JSM server if its not already
// created.
destination = session.createQueue(this.queueName);
// MessageProducer is used for sending (producing) messages to the queue.
producer = session.createProducer(destination);
// MessageConsumer is used for receiving (consuming) messages from the queue.
consumer = session.createConsumer(destination);
}
public void send(String textMessage)
throws Exception {
// We will send a text message
TextMessage message = session.createTextMessage(textMessage);
// push the message into queue
producer.send(message);
System.out.printf("'%s' text message sent to the queue '%s' running on local JMS Server.\n", message, queueName);
}
public void receive() throws Exception {
// receive the message from the queue.
Message message = consumer.receive();
// Since We are using TestMessage in our example. MessageProducer sent us a TextMessage
// So we need cast to it to get access to its getText() method which will give us the text of the message
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.printf("Received message '%s' from the queue '%s' running on local JMS Server.\n", textMessage.getText(), queueName);
}
}
public void close() throws JMSException {
producer.close();
producer = null;
consumer.close();
consumer = null;
session.close();
session = null;
connection.close();
connection = null;
}
}
Code for ProducerMain class:
package org.trishinfotech.activemq.example1;
public class ProducerMain {
private static final String QUEUE_NAME = "MyFirstActiveMQ";
public static void main(String[] args) throws Exception {
SimpleQueue queue = new SimpleQueue(QUEUE_NAME);
queue.send("Welcome to the World of ActiveMQ. Hope you will enjoy this tutorial.");
queue.close();
}
}
Upon executing the above class, we could see that the queue with the queue name "MyFirstActiveMQ" is created. you could see that the "counts" of the queue show there is "1 enqueue message" which is pending for "processing." I mean, sent but not received.
Now, let's write a class to receive this message from the queue.
Code for ConsumerMain class:
package org.trishinfotech.activemq.example1;
public class ConsumerMain {
private static final String QUEUE_NAME = "MyFirstActiveMQ";
public static void main(String[] args) throws Exception {
SimpleQueue queue = new SimpleQueue(QUEUE_NAME);
queue.receive();
queue.close();
}
}
Below is the output of the above class.
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Received message 'Welcome to the World of ActiveMQ. Hope you will enjoy this tutorial.' from the queue 'MyFirstActiveMQ' running on local JMS Server.
Have you noticed a change in the "counts?" The dequeue count becomes "1," and there is no more pending message now since it is "processed."
Message Acknowledgement
Before I moved to another example, I like to talk about "acknowledgment." We can see that the session was created with AUTO_ACKNOWLEDGE
, which means the message will be auto-acknowledged upon send or receive. If we like to control this behavior, we could use INDIVIDUAL_ACKNOWLEDGE
. In that case, after sending ()
or receiving ()
, we need to call acknowledge()
to acknowledge the delivery and processing of the message, which could be based on the processing status of the message. Without that, the message will be considered a fail and will stay in the queue. I will use AUTO_ACKNOWLEDGE
for the demonstrating MQs in this article and will encourage my viewers to explore more about this.
Now let's have another example of using the custom object of "Bank-Account" as a message in the queue. Also, I will use Publisher/Subscriber Messaging for this example using a queue.
Another Example of Bank Account Create Using JMS-ActiveMQ Queues
Code for BankAccount class:
package org.trishinfotech.activemq.example2;
import java.io.Serializable;
public class BankAccount implements Serializable {
private static final long serialVersionUID = 8903317706228710187L;
private long applicationNo;
private String userName;
private double depositAmount;
private long customerId;
private String atmCardNumber;
public BankAccount(long applicationNo, String userName, double depositAmount) {
super();
this.applicationNo = applicationNo;
this.userName = userName;
this.depositAmount = depositAmount;
}
public long getApplicationNo() {
return applicationNo;
}
public void setApplicationNo(long applicationNo) {
this.applicationNo = applicationNo;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public double getDepositAmount() {
return depositAmount;
}
public void setDepositAmount(double depositAmount) {
this.depositAmount = depositAmount;
}
public long getCustomerId() {
return customerId;
}
public void setCustomerId(long customerId) {
this.customerId = customerId;
}
public String getAtmCardNumber() {
return atmCardNumber;
}
public void setAtmCardNumber(String atmCardNumber) {
this.atmCardNumber = atmCardNumber;
}
@Override
public String toString() {
return String.format("%10s, %10s, %10s, %10s, %20s",
applicationNo, userName, depositAmount, customerId != 0 ? customerId : "NA",
atmCardNumber != null ? atmCardNumber : "NA");
}
}
Now, let's create another class using the Publisher/Subscriber Messaging approach. Here, I will code to create one or many consumers for the queue to demonstrate the difference.
// MessageConsumer is used for receiving (consuming) messages from the queue.
IntStream.range(0, noOfConsumers).forEach(consumerNo -> {
try {
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyQueueListener(this, "Consumer" + (consumerNo + 1)));
consumers.add(consumer);
} catch (Exception exp) {
// we can ignore as of now
}
});
In this class, We will maintain an internal collection of the account created
private Map<Long, BankAccount> accountMap = new HashMap<Long, BankAccount>();
I am doing this to show the broadcasted versus distributed messaging for publishers/subscribers. So, I am maintaining the "counts" inside the listeners to show the processing difference between broadcast versus distributed. I wrote a method printSummery()
to project these counts as well.
public void printSummary() {
System.out.printf("\n\n%30s | %14s | %14s | %14s\n", "Source", "No Of Created", "No Of Existing",
"No Of Processed");
System.out.println("====================================================================================");
AtomicInteger totalNoOfExisting = new AtomicInteger();
consumers.stream().forEach(consumer -> {
try {
MyQueueListener listener = (MyQueueListener) consumer.getMessageListener();
listener.printSummary();
totalNoOfExisting.getAndAdd(listener.getNoOfExisting());
} catch (JMSException exp) {
// we can ignore as of now
exp.printStackTrace();
}
});
System.out.println("------------------------------------------------------------------------------------");
System.out.printf("%30s | %14s | %14s | %14s\n", queueName, accountMap.size(), totalNoOfExisting.get(),
noOfSend);
System.out.println("====================================================================================");
}
Now, Code for MyQueue class:
package org.trishinfotech.activemq.example2;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyQueue {
private static final String CLIENTID = "TrishInfotechActiveMQ";
private String queueName;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private List<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
private Map<Long, BankAccount> accountMap = new HashMap<Long, BankAccount>();
private int noOfSend = 0;
public MyQueue(String queueName, int noOfConsumers) throws Exception {
super();
// The name of the queue.
this.queueName = queueName;
// URL of the JMS server is required to create connection factory.
// DEFAULT_BROKER_URL is : tcp://localhost:61616 and is indicates that JMS
// server is running on localhost
connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// Getting JMS connection from the server and starting it
connection = connectionFactory.createConnection("admin", "admin");
connection.setClientID(CLIENTID);
connection.start();
// Creating a non-transactional session to send/receive JMS message.
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Destination represents here our queue ’BankAccountProcessingQueue’ on the JMS
// server.
// The queue will be created automatically on the JSM server if its not already
// created.
destination = session.createQueue(this.queueName);
// MessageProducer is used for sending (producing) messages to the queue.
producer = session.createProducer(destination);
// MessageConsumer is used for receiving (consuming) messages from the queue.
IntStream.range(0, noOfConsumers).forEach(consumerNo -> {
try {
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyQueueListener(this, "Consumer" + (consumerNo + 1)));
consumers.add(consumer);
} catch (Exception exp) {
// we can ignore as of now
}
});
}
public void sendAccountToQueue(MessageProducer producer, BankAccount newAccount) throws Exception {
System.out.printf("%10s | %10s | %10s | %50s\n", "Producer", "Sending", "-", newAccount);
ObjectMessage message = session.createObjectMessage(newAccount);
// push the message into queue
producer.send(message);
noOfSend++;
}
public ProcessStatus processAccountFromQueue(Message message, String consumerTaskName) throws Exception {
ProcessStatus status = ProcessStatus.QUEUE_CLOSED;
BankAccount account = null;
if (message != null) {
ObjectMessage objectMessage = (ObjectMessage) message;
account = (BankAccount) objectMessage.getObject();
if (account != null) {
BankAccount existingAccount = pullAccountByApplicationNo(account.getApplicationNo());
if (existingAccount != null) {
System.out.printf("%10s | %10s | %10s | %50s\n", "Consumer", consumerTaskName, "Existing",
existingAccount);
account = existingAccount;
status = ProcessStatus.EXISTING;
} else {
createBankAccount(account);
System.out.printf("%10s | %10s | %10s | %50s\n", "Consumer", consumerTaskName, "Created", account);
status = ProcessStatus.CREATED;
}
}
message.acknowledge();
}
return status;
}
private void createBankAccount(BankAccount account) throws Exception {
Helper.setupBankAccount(account);
accountMap.put(account.getApplicationNo(), account);
}
public BankAccount pullAccountByApplicationNo(long applicationNo) {
return accountMap.get(applicationNo);
}
public void close() throws JMSException {
producer.close();
producer = null;
consumers.stream().forEach(consumer -> {
try {
consumer.close();
} catch (JMSException e) {
// we can ignore as of now
}
});
consumers.clear();
session.close();
session = null;
connection.close();
connection = null;
}
public String getQueueName() {
return queueName;
}
public Connection getConnection() {
return connection;
}
public Session getSession() {
return session;
}
public Destination getDestination() {
return destination;
}
public MessageProducer getProducer() {
return producer;
}
public void printSummary() {
System.out.printf("\n\n%30s | %14s | %14s | %14s\n", "Source", "No Of Created", "No Of Existing",
"No Of Processed");
System.out.println("====================================================================================");
AtomicInteger totalNoOfExisting = new AtomicInteger();
consumers.stream().forEach(consumer -> {
try {
MyQueueListener listener = (MyQueueListener) consumer.getMessageListener();
listener.printSummary();
totalNoOfExisting.getAndAdd(listener.getNoOfExisting());
} catch (JMSException exp) {
// we can ignore as of now
exp.printStackTrace();
}
});
System.out.println("------------------------------------------------------------------------------------");
System.out.printf("%30s | %14s | %14s | %14s\n", queueName, accountMap.size(), totalNoOfExisting.get(),
noOfSend);
System.out.println("====================================================================================");
}
private static enum ProcessStatus {
CREATED, EXISTING, QUEUE_CLOSED
}
private static class Helper {
private static long initialCustomerId = 1234567l;
private static long accountCreated = 0l;
public static void setupBankAccount(BankAccount account) {
long nextAvailableCustomerId = initialCustomerId + accountCreated++;
Random rand = new Random();
String atmCardNumber = String.format("%04d %04d %04d %04d", rand.nextInt(9999), rand.nextInt(9999),
rand.nextInt(9999), rand.nextInt(9999));
account.setCustomerId(nextAvailableCustomerId);
account.setAtmCardNumber(atmCardNumber);
}
}
private static class MyQueueListener implements MessageListener {
private MyQueue queue;
private String consumerTaskName;
private int noOfProcessed = 0, noOfCreated = 0, noOfExisting = 0;
public MyQueueListener(final MyQueue queue, String consumerTaskName) {
super();
this.queue = queue;
this.consumerTaskName = consumerTaskName;
}
@Override
public void onMessage(Message message) {
// on poll the message from the queue
try {
ProcessStatus status = queue.processAccountFromQueue(message, consumerTaskName);
switch (status) {
case CREATED:
noOfCreated++;
break;
case EXISTING:
noOfExisting++;
break;
case QUEUE_CLOSED:
default:
}
if (!ProcessStatus.QUEUE_CLOSED.equals(status)) {
noOfProcessed++;
}
} catch (Exception exp) {
// we can ignore as of now
}
}
public int getNoOfProcessed() {
return noOfProcessed;
}
public int getNoOfCreated() {
return noOfCreated;
}
public int getNoOfExisting() {
return noOfExisting;
}
public void printSummary() {
System.out.printf("%30s | %14s | %14s | %14s\n", consumerTaskName, getNoOfCreated(), getNoOfExisting(),
getNoOfProcessed());
}
}
}-
Now, let's keep it simple and create a single consumer to process 100 accounts. I am using java.util.Random class to generate the application. I am using the applicationNo
as the key to the internal collection of the accounts map and hence will identify and eliminate duplicate applicationNo
while creating bank accounts.
Code for Main class:
package org.trishinfotech.activemq.example2;
import java.util.Random;
public class Main {
private static final String QUEUE_NAME = "BankAccountProcessingQueue";
private static final int NO_OF_CONSUMERS = 1;
private static final long NO_OF_ACCOUNTS = 100l;
public static void main(String[] args) throws Exception {
MyQueue queue = new MyQueue(QUEUE_NAME, NO_OF_CONSUMERS);
Random rand = new Random();
System.out.printf("%10s | %10s | %10s | %50s\n", "Source", "Action", "Result",
"Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)");
System.out.println(
"=================================================================================================================");
for (long i = 1; i <= NO_OF_ACCOUNTS; i++) {
long applicationNo = rand.nextLong(NO_OF_ACCOUNTS);
BankAccount newAccount = new BankAccount(applicationNo, "Customer" + applicationNo, 1000.0d);
queue.sendAccountToQueue(queue.getProducer(), newAccount);
}
System.out.println(
"=================================================================================================================");
// just to give graceful time to finish the processing
Thread.sleep(2000);
queue.printSummary();
queue.close();
}
}
Below is the output of the program:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Source | Action | Result | Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)
=================================================================================================================
Producer | Sending | - | 32, Customer32, 1000.0, NA, NA
Producer | Sending | - | 56, Customer56, 1000.0, NA, NA
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Producer | Sending | - | 44, Customer44, 1000.0, NA, NA
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Consumer | Consumer1 | Created | 32, Customer32, 1000.0, 1234567, 3200 3397 8635 1120
Consumer | Consumer1 | Created | 56, Customer56, 1000.0, 1234568, 3744 9184 2573 8082
Producer | Sending | - | 4, Customer4, 1000.0, NA, NA
Consumer | Consumer1 | Created | 33, Customer33, 1000.0, 1234569, 9250 4024 0231 8905
Consumer | Consumer1 | Created | 40, Customer40, 1000.0, 1234570, 8777 9837 6345 3826
Consumer | Consumer1 | Created | 44, Customer44, 1000.0, 1234571, 6149 9058 1789 1815
Consumer | Consumer1 | Created | 79, Customer79, 1000.0, 1234572, 1616 8892 4361 5600
Producer | Sending | - | 85, Customer85, 1000.0, NA, NA
Consumer | Consumer1 | Created | 4, Customer4, 1000.0, 1234573, 7567 7170 1834 7935
Producer | Sending | - | 2, Customer2, 1000.0, NA, NA
Consumer | Consumer1 | Created | 85, Customer85, 1000.0, 1234574, 8926 1586 2528 0920
Consumer | Consumer1 | Created | 2, Customer2, 1000.0, 1234575, 9077 7999 0811 0552
Producer | Sending | - | 80, Customer80, 1000.0, NA, NA
Producer | Sending | - | 8, Customer8, 1000.0, NA, NA
Consumer | Consumer1 | Created | 80, Customer80, 1000.0, 1234576, 0490 7847 0302 0828
Producer | Sending | - | 32, Customer32, 1000.0, NA, NA
Consumer | Consumer1 | Created | 8, Customer8, 1000.0, 1234577, 1478 0424 3662 5424
Producer | Sending | - | 52, Customer52, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 32, Customer32, 1000.0, 1234567, 3200 3397 8635 1120
Producer | Sending | - | 9, Customer9, 1000.0, NA, NA
Consumer | Consumer1 | Created | 52, Customer52, 1000.0, 1234578, 8227 8871 4694 9852
Consumer | Consumer1 | Created | 9, Customer9, 1000.0, 1234579, 8350 9886 7936 8989
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 79, Customer79, 1000.0, 1234572, 1616 8892 4361 5600
Producer | Sending | - | 10, Customer10, 1000.0, NA, NA
Consumer | Consumer1 | Created | 59, Customer59, 1000.0, 1234580, 2150 0868 9503 9384
Producer | Sending | - | 58, Customer58, 1000.0, NA, NA
Consumer | Consumer1 | Created | 10, Customer10, 1000.0, 1234581, 2257 5527 2659 4646
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer1 | Created | 58, Customer58, 1000.0, 1234582, 6886 6123 2116 6497
Producer | Sending | - | 86, Customer86, 1000.0, NA, NA
Consumer | Consumer1 | Created | 91, Customer91, 1000.0, 1234583, 2169 6147 5544 6505
Producer | Sending | - | 1, Customer1, 1000.0, NA, NA
Consumer | Consumer1 | Created | 86, Customer86, 1000.0, 1234584, 3896 6653 8786 0336
Consumer | Consumer1 | Created | 1, Customer1, 1000.0, 1234585, 7933 9713 7564 8774
Producer | Sending | - | 34, Customer34, 1000.0, NA, NA
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer1 | Created | 34, Customer34, 1000.0, 1234586, 8733 0806 6770 8185
Consumer | Consumer1 | Created | 81, Customer81, 1000.0, 1234587, 6436 7346 0880 5036
Producer | Sending | - | 44, Customer44, 1000.0, NA, NA
Producer | Sending | - | 29, Customer29, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 44, Customer44, 1000.0, 1234571, 6149 9058 1789 1815
Consumer | Consumer1 | Created | 29, Customer29, 1000.0, 1234588, 9380 3485 9668 7455
Producer | Sending | - | 76, Customer76, 1000.0, NA, NA
Consumer | Consumer1 | Created | 76, Customer76, 1000.0, 1234589, 0160 0030 2607 7561
Producer | Sending | - | 27, Customer27, 1000.0, NA, NA
Consumer | Consumer1 | Created | 27, Customer27, 1000.0, 1234590, 4016 7931 6904 8872
Producer | Sending | - | 78, Customer78, 1000.0, NA, NA
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Created | 78, Customer78, 1000.0, 1234591, 0600 7706 8539 6593
Producer | Sending | - | 31, Customer31, 1000.0, NA, NA
Consumer | Consumer1 | Created | 11, Customer11, 1000.0, 1234592, 2949 9629 2841 7472
Producer | Sending | - | 22, Customer22, 1000.0, NA, NA
Consumer | Consumer1 | Created | 31, Customer31, 1000.0, 1234593, 4715 1756 4772 1642
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer1 | Created | 22, Customer22, 1000.0, 1234594, 1904 9468 6889 5045
Consumer | Consumer1 | Created | 51, Customer51, 1000.0, 1234595, 7178 8470 2039 9984
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Producer | Sending | - | 1, Customer1, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 79, Customer79, 1000.0, 1234572, 1616 8892 4361 5600
Producer | Sending | - | 84, Customer84, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 1, Customer1, 1000.0, 1234585, 7933 9713 7564 8774
Producer | Sending | - | 65, Customer65, 1000.0, NA, NA
Consumer | Consumer1 | Created | 84, Customer84, 1000.0, 1234596, 8717 7632 1082 2221
Producer | Sending | - | 22, Customer22, 1000.0, NA, NA
Consumer | Consumer1 | Created | 65, Customer65, 1000.0, 1234597, 3495 8328 5937 7256
Consumer | Consumer1 | Existing | 22, Customer22, 1000.0, 1234594, 1904 9468 6889 5045
Producer | Sending | - | 95, Customer95, 1000.0, NA, NA
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer1 | Created | 95, Customer95, 1000.0, 1234598, 6789 4023 3287 8264
Consumer | Consumer1 | Created | 26, Customer26, 1000.0, 1234599, 4128 7842 2899 6973
Producer | Sending | - | 58, Customer58, 1000.0, NA, NA
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 58, Customer58, 1000.0, 1234582, 6886 6123 2116 6497
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Consumer | Consumer1 | Created | 16, Customer16, 1000.0, 1234600, 2519 5705 9499 0866
Producer | Sending | - | 98, Customer98, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 59, Customer59, 1000.0, 1234580, 2150 0868 9503 9384
Consumer | Consumer1 | Created | 98, Customer98, 1000.0, 1234601, 5907 2731 7606 6885
Producer | Sending | - | 67, Customer67, 1000.0, NA, NA
Consumer | Consumer1 | Created | 67, Customer67, 1000.0, 1234602, 7736 6233 4454 8209
Producer | Sending | - | 19, Customer19, 1000.0, NA, NA
Consumer | Consumer1 | Created | 19, Customer19, 1000.0, 1234603, 1896 1387 4260 1421
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 16, Customer16, 1000.0, 1234600, 2519 5705 9499 0866
Consumer | Consumer1 | Created | 14, Customer14, 1000.0, 1234604, 6946 6975 9901 3053
Producer | Sending | - | 73, Customer73, 1000.0, NA, NA
Producer | Sending | - | 1, Customer1, 1000.0, NA, NA
Consumer | Consumer1 | Created | 73, Customer73, 1000.0, 1234605, 6437 1943 1387 0499
Producer | Sending | - | 65, Customer65, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 1, Customer1, 1000.0, 1234585, 7933 9713 7564 8774
Consumer | Consumer1 | Existing | 65, Customer65, 1000.0, 1234597, 3495 8328 5937 7256
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 51, Customer51, 1000.0, 1234595, 7178 8470 2039 9984
Producer | Sending | - | 53, Customer53, 1000.0, NA, NA
Consumer | Consumer1 | Created | 53, Customer53, 1000.0, 1234606, 7951 0988 3003 7979
Producer | Sending | - | 66, Customer66, 1000.0, NA, NA
Producer | Sending | - | 70, Customer70, 1000.0, NA, NA
Consumer | Consumer1 | Created | 66, Customer66, 1000.0, 1234607, 5268 0147 8064 5374
Consumer | Consumer1 | Created | 70, Customer70, 1000.0, 1234608, 9900 0391 7799 3790
Producer | Sending | - | 23, Customer23, 1000.0, NA, NA
Producer | Sending | - | 35, Customer35, 1000.0, NA, NA
Consumer | Consumer1 | Created | 23, Customer23, 1000.0, 1234609, 7886 3577 1543 7236
Consumer | Consumer1 | Created | 35, Customer35, 1000.0, 1234610, 7853 8024 7143 6517
Producer | Sending | - | 7, Customer7, 1000.0, NA, NA
Producer | Sending | - | 9, Customer9, 1000.0, NA, NA
Consumer | Consumer1 | Created | 7, Customer7, 1000.0, 1234611, 1450 5800 6152 7317
Consumer | Consumer1 | Existing | 9, Customer9, 1000.0, 1234579, 8350 9886 7936 8989
Producer | Sending | - | 18, Customer18, 1000.0, NA, NA
Consumer | Consumer1 | Created | 18, Customer18, 1000.0, 1234612, 2604 5579 6066 3342
Producer | Sending | - | 52, Customer52, 1000.0, NA, NA
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 52, Customer52, 1000.0, 1234578, 8227 8871 4694 9852
Consumer | Consumer1 | Existing | 51, Customer51, 1000.0, 1234595, 7178 8470 2039 9984
Producer | Sending | - | 30, Customer30, 1000.0, NA, NA
Consumer | Consumer1 | Created | 30, Customer30, 1000.0, 1234613, 4182 0291 2143 0859
Producer | Sending | - | 52, Customer52, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 52, Customer52, 1000.0, 1234578, 8227 8871 4694 9852
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 14, Customer14, 1000.0, 1234604, 6946 6975 9901 3053
Producer | Sending | - | 34, Customer34, 1000.0, NA, NA
Producer | Sending | - | 99, Customer99, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 34, Customer34, 1000.0, 1234586, 8733 0806 6770 8185
Consumer | Consumer1 | Created | 99, Customer99, 1000.0, 1234614, 6577 0566 4401 8507
Producer | Sending | - | 61, Customer61, 1000.0, NA, NA
Consumer | Consumer1 | Created | 61, Customer61, 1000.0, 1234615, 2228 4549 7602 2825
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 40, Customer40, 1000.0, 1234570, 8777 9837 6345 3826
Consumer | Consumer1 | Created | 92, Customer92, 1000.0, 1234616, 0567 7860 3332 6226
Producer | Sending | - | 4, Customer4, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 4, Customer4, 1000.0, 1234573, 7567 7170 1834 7935
Producer | Sending | - | 15, Customer15, 1000.0, NA, NA
Consumer | Consumer1 | Created | 15, Customer15, 1000.0, 1234617, 7507 3766 2759 6399
Producer | Sending | - | 7, Customer7, 1000.0, NA, NA
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 7, Customer7, 1000.0, 1234611, 1450 5800 6152 7317
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 26, Customer26, 1000.0, 1234599, 4128 7842 2899 6973
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 79, Customer79, 1000.0, 1234572, 1616 8892 4361 5600
Producer | Sending | - | 13, Customer13, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 79, Customer79, 1000.0, 1234572, 1616 8892 4361 5600
Producer | Sending | - | 53, Customer53, 1000.0, NA, NA
Consumer | Consumer1 | Created | 13, Customer13, 1000.0, 1234618, 6371 2327 4479 5462
Consumer | Consumer1 | Existing | 53, Customer53, 1000.0, 1234606, 7951 0988 3003 7979
Producer | Sending | - | 36, Customer36, 1000.0, NA, NA
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Consumer | Consumer1 | Created | 36, Customer36, 1000.0, 1234619, 6798 4721 4623 6745
Consumer | Consumer1 | Existing | 92, Customer92, 1000.0, 1234616, 0567 7860 3332 6226
Producer | Sending | - | 97, Customer97, 1000.0, NA, NA
Producer | Sending | - | 28, Customer28, 1000.0, NA, NA
Consumer | Consumer1 | Created | 97, Customer97, 1000.0, 1234620, 2609 9051 2042 8349
Producer | Sending | - | 22, Customer22, 1000.0, NA, NA
Consumer | Consumer1 | Created | 28, Customer28, 1000.0, 1234621, 7182 0130 5151 5727
Consumer | Consumer1 | Existing | 22, Customer22, 1000.0, 1234594, 1904 9468 6889 5045
Producer | Sending | - | 97, Customer97, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 97, Customer97, 1000.0, 1234620, 2609 9051 2042 8349
Producer | Sending | - | 69, Customer69, 1000.0, NA, NA
Producer | Sending | - | 6, Customer6, 1000.0, NA, NA
Consumer | Consumer1 | Created | 69, Customer69, 1000.0, 1234622, 0436 2671 0950 5419
Producer | Sending | - | 64, Customer64, 1000.0, NA, NA
Consumer | Consumer1 | Created | 6, Customer6, 1000.0, 1234623, 0645 4559 1553 3087
Consumer | Consumer1 | Created | 64, Customer64, 1000.0, 1234624, 2515 7313 1610 2004
Producer | Sending | - | 31, Customer31, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 31, Customer31, 1000.0, 1234593, 4715 1756 4772 1642
Producer | Sending | - | 97, Customer97, 1000.0, NA, NA
Producer | Sending | - | 5, Customer5, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 97, Customer97, 1000.0, 1234620, 2609 9051 2042 8349
Consumer | Consumer1 | Created | 5, Customer5, 1000.0, 1234625, 6951 2975 0942 9195
Producer | Sending | - | 88, Customer88, 1000.0, NA, NA
Consumer | Consumer1 | Created | 88, Customer88, 1000.0, 1234626, 4710 6913 0947 0452
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 40, Customer40, 1000.0, 1234570, 8777 9837 6345 3826
Producer | Sending | - | 86, Customer86, 1000.0, NA, NA
Producer | Sending | - | 24, Customer24, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 86, Customer86, 1000.0, 1234584, 3896 6653 8786 0336
Producer | Sending | - | 99, Customer99, 1000.0, NA, NA
Consumer | Consumer1 | Created | 24, Customer24, 1000.0, 1234627, 9940 2649 7205 3092
Producer | Sending | - | 56, Customer56, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 99, Customer99, 1000.0, 1234614, 6577 0566 4401 8507
Consumer | Consumer1 | Existing | 56, Customer56, 1000.0, 1234568, 3744 9184 2573 8082
Producer | Sending | - | 76, Customer76, 1000.0, NA, NA
Producer | Sending | - | 21, Customer21, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 76, Customer76, 1000.0, 1234589, 0160 0030 2607 7561
Consumer | Consumer1 | Created | 21, Customer21, 1000.0, 1234628, 5242 7192 7713 1513
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Producer | Sending | - | 64, Customer64, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 26, Customer26, 1000.0, 1234599, 4128 7842 2899 6973
Producer | Sending | - | 31, Customer31, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 64, Customer64, 1000.0, 1234624, 2515 7313 1610 2004
=================================================================================================================
Consumer | Consumer1 | Existing | 31, Customer31, 1000.0, 1234593, 4715 1756 4772 1642
Source | No Of Created | No Of Existing | No Of Processed
====================================================================================
Consumer1 | 62 | 38 | 100
------------------------------------------------------------------------------------
BankAccountProcessingQueue | 62 | 38 | 100
====================================================================================
We can see that after processing 100 account message objects, there are 38 duplicates/existing accounts we found. Your output will be different due to the use of java.util.Random
.
ActiveMQ screen will look like the below:
I hope the example is easy to understand. Now, let's try the same with two consumers to understand the load balancing of the queue.
Code for Main class:
package org.trishinfotech.activemq.example3;
import java.util.Random;
import org.trishinfotech.activemq.example2.BankAccount;
import org.trishinfotech.activemq.example2.MyQueue;
public class Main {
private static final String QUEUE_NAME = "BankAccountProcessingQueue";
private static final int NO_OF_CONSUMERS = 2;
private static final long NO_OF_ACCOUNTS = 100l;
public static void main(String[] args) throws Exception {
MyQueue queue = new MyQueue(QUEUE_NAME, NO_OF_CONSUMERS);
Random rand = new Random();
System.out.printf("%10s | %10s | %10s | %50s\n", "Source", "Action", "Result",
"Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)");
System.out.println(
"=================================================================================================================");
for (long i = 1; i <= NO_OF_ACCOUNTS; i++) {
long applicationNo = rand.nextLong(NO_OF_ACCOUNTS);
BankAccount newAccount = new BankAccount(applicationNo, "Customer" + applicationNo, 1000.0d);
queue.sendAccountToQueue(queue.getProducer(), newAccount);
}
System.out.println(
"=================================================================================================================");
// just to give graceful time to finish the processing
Thread.sleep(2000);
queue.printSummary();
queue.close();
}
}
Below is the output of the program:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Source | Action | Result | Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)
=================================================================================================================
Producer | Sending | - | 6, Customer6, 1000.0, NA, NA
Producer | Sending | - | 5, Customer5, 1000.0, NA, NA
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Producer | Sending | - | 65, Customer65, 1000.0, NA, NA
Producer | Sending | - | 67, Customer67, 1000.0, NA, NA
Producer | Sending | - | 84, Customer84, 1000.0, NA, NA
Consumer | Consumer1 | Created | 6, Customer6, 1000.0, 1234567, 9697 8082 9538 2478
Consumer | Consumer2 | Created | 5, Customer5, 1000.0, 1234568, 8932 9631 7423 7836
Producer | Sending | - | 60, Customer60, 1000.0, NA, NA
Consumer | Consumer1 | Created | 79, Customer79, 1000.0, 1234569, 2355 1916 5982 5465
Consumer | Consumer2 | Created | 65, Customer65, 1000.0, 1234570, 4673 9439 0219 9615
Consumer | Consumer1 | Created | 67, Customer67, 1000.0, 1234571, 0534 9224 6182 4596
Consumer | Consumer2 | Created | 84, Customer84, 1000.0, 1234572, 4585 5633 7244 6746
Producer | Sending | - | 43, Customer43, 1000.0, NA, NA
Consumer | Consumer1 | Created | 60, Customer60, 1000.0, 1234573, 6105 4375 7040 6378
Producer | Sending | - | 88, Customer88, 1000.0, NA, NA
Consumer | Consumer2 | Created | 43, Customer43, 1000.0, 1234574, 2008 9331 0584 6068
Producer | Sending | - | 69, Customer69, 1000.0, NA, NA
Consumer | Consumer1 | Created | 88, Customer88, 1000.0, 1234575, 7168 0720 5033 7590
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer2 | Created | 69, Customer69, 1000.0, 1234576, 7773 0871 6479 7101
Producer | Sending | - | 35, Customer35, 1000.0, NA, NA
Consumer | Consumer1 | Created | 91, Customer91, 1000.0, 1234577, 9947 9541 7097 9114
Producer | Sending | - | 7, Customer7, 1000.0, NA, NA
Consumer | Consumer2 | Created | 35, Customer35, 1000.0, 1234578, 5891 2813 3102 6968
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer1 | Created | 7, Customer7, 1000.0, 1234579, 8134 8895 6595 6592
Consumer | Consumer2 | Created | 81, Customer81, 1000.0, 1234580, 9401 8500 3087 5240
Producer | Sending | - | 44, Customer44, 1000.0, NA, NA
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Consumer | Consumer1 | Created | 44, Customer44, 1000.0, 1234581, 3113 7699 0855 5879
Producer | Sending | - | 8, Customer8, 1000.0, NA, NA
Consumer | Consumer2 | Created | 33, Customer33, 1000.0, 1234582, 2180 2427 5346 0595
Producer | Sending | - | 22, Customer22, 1000.0, NA, NA
Consumer | Consumer1 | Created | 8, Customer8, 1000.0, 1234583, 5726 1953 0341 3974
Consumer | Consumer2 | Created | 22, Customer22, 1000.0, 1234584, 8585 3740 2476 4038
Producer | Sending | - | 45, Customer45, 1000.0, NA, NA
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer1 | Created | 45, Customer45, 1000.0, 1234585, 8406 7992 4979 7825
Consumer | Consumer2 | Created | 26, Customer26, 1000.0, 1234586, 0667 8817 2441 0981
Producer | Sending | - | 89, Customer89, 1000.0, NA, NA
Producer | Sending | - | 0, Customer0, 1000.0, NA, NA
Consumer | Consumer1 | Created | 89, Customer89, 1000.0, 1234587, 0524 4072 7723 7401
Producer | Sending | - | 39, Customer39, 1000.0, NA, NA
Consumer | Consumer2 | Created | 0, Customer0, 1000.0, 1234588, 1362 0794 9203 5617
Producer | Sending | - | 64, Customer64, 1000.0, NA, NA
Consumer | Consumer1 | Created | 39, Customer39, 1000.0, 1234589, 6620 6795 7112 1253
Producer | Sending | - | 19, Customer19, 1000.0, NA, NA
Consumer | Consumer2 | Created | 64, Customer64, 1000.0, 1234590, 2323 4946 2381 3735
Producer | Sending | - | 49, Customer49, 1000.0, NA, NA
Consumer | Consumer1 | Created | 19, Customer19, 1000.0, 1234591, 6725 9654 8938 5328
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Consumer | Consumer2 | Created | 49, Customer49, 1000.0, 1234592, 2583 2592 9217 8369
Producer | Sending | - | 94, Customer94, 1000.0, NA, NA
Consumer | Consumer1 | Created | 40, Customer40, 1000.0, 1234593, 0915 5360 7351 3040
Producer | Sending | - | 63, Customer63, 1000.0, NA, NA
Consumer | Consumer2 | Created | 94, Customer94, 1000.0, 1234594, 0089 8377 4312 8056
Producer | Sending | - | 46, Customer46, 1000.0, NA, NA
Consumer | Consumer1 | Created | 63, Customer63, 1000.0, 1234595, 5836 8026 5689 6772
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer2 | Created | 46, Customer46, 1000.0, 1234596, 8078 1268 8691 5128
Producer | Sending | - | 93, Customer93, 1000.0, NA, NA
Consumer | Consumer1 | Created | 14, Customer14, 1000.0, 1234597, 2542 2660 1958 0525
Consumer | Consumer2 | Created | 93, Customer93, 1000.0, 1234598, 0798 9242 3666 2920
Producer | Sending | - | 70, Customer70, 1000.0, NA, NA
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Consumer | Consumer1 | Created | 70, Customer70, 1000.0, 1234599, 9808 2849 2863 5346
Producer | Sending | - | 25, Customer25, 1000.0, NA, NA
Consumer | Consumer2 | Created | 59, Customer59, 1000.0, 1234600, 9144 5434 1089 5674
Producer | Sending | - | 95, Customer95, 1000.0, NA, NA
Consumer | Consumer1 | Created | 25, Customer25, 1000.0, 1234601, 7053 1332 3416 2839
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Consumer | Consumer2 | Created | 95, Customer95, 1000.0, 1234602, 5022 5482 7995 3921
Producer | Sending | - | 50, Customer50, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 40, Customer40, 1000.0, 1234593, 0915 5360 7351 3040
Producer | Sending | - | 0, Customer0, 1000.0, NA, NA
Consumer | Consumer2 | Created | 50, Customer50, 1000.0, 1234603, 7104 6483 8440 0546
Producer | Sending | - | 80, Customer80, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 0, Customer0, 1000.0, 1234588, 1362 0794 9203 5617
Producer | Sending | - | 62, Customer62, 1000.0, NA, NA
Consumer | Consumer2 | Created | 80, Customer80, 1000.0, 1234604, 7973 5170 5495 4371
Producer | Sending | - | 20, Customer20, 1000.0, NA, NA
Consumer | Consumer1 | Created | 62, Customer62, 1000.0, 1234605, 3273 1800 3425 7099
Consumer | Consumer2 | Created | 20, Customer20, 1000.0, 1234606, 5177 8039 7832 9328
Producer | Sending | - | 29, Customer29, 1000.0, NA, NA
Producer | Sending | - | 95, Customer95, 1000.0, NA, NA
Consumer | Consumer1 | Created | 29, Customer29, 1000.0, 1234607, 7406 8281 4109 7048
Producer | Sending | - | 71, Customer71, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 95, Customer95, 1000.0, 1234602, 5022 5482 7995 3921
Producer | Sending | - | 50, Customer50, 1000.0, NA, NA
Consumer | Consumer1 | Created | 71, Customer71, 1000.0, 1234608, 6691 4058 4131 1629
Producer | Sending | - | 79, Customer79, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 50, Customer50, 1000.0, 1234603, 7104 6483 8440 0546
Producer | Sending | - | 41, Customer41, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 79, Customer79, 1000.0, 1234569, 2355 1916 5982 5465
Consumer | Consumer2 | Created | 41, Customer41, 1000.0, 1234609, 8759 1100 2823 3202
Producer | Sending | - | 58, Customer58, 1000.0, NA, NA
Producer | Sending | - | 96, Customer96, 1000.0, NA, NA
Consumer | Consumer1 | Created | 58, Customer58, 1000.0, 1234610, 9508 1124 9865 6222
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer2 | Created | 96, Customer96, 1000.0, 1234611, 3347 4815 5352 0776
Consumer | Consumer1 | Existing | 91, Customer91, 1000.0, 1234577, 9947 9541 7097 9114
Producer | Sending | - | 41, Customer41, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 41, Customer41, 1000.0, 1234609, 8759 1100 2823 3202
Producer | Sending | - | 97, Customer97, 1000.0, NA, NA
Consumer | Consumer1 | Created | 97, Customer97, 1000.0, 1234612, 4032 0267 9252 8179
Producer | Sending | - | 57, Customer57, 1000.0, NA, NA
Consumer | Consumer2 | Created | 57, Customer57, 1000.0, 1234613, 4431 1703 6099 7565
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Producer | Sending | - | 93, Customer93, 1000.0, NA, NA
Consumer | Consumer1 | Created | 92, Customer92, 1000.0, 1234614, 2241 3719 8251 8996
Producer | Sending | - | 15, Customer15, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 93, Customer93, 1000.0, 1234598, 0798 9242 3666 2920
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Consumer | Consumer1 | Created | 15, Customer15, 1000.0, 1234615, 4885 4288 0777 4786
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 33, Customer33, 1000.0, 1234582, 2180 2427 5346 0595
Consumer | Consumer1 | Existing | 33, Customer33, 1000.0, 1234582, 2180 2427 5346 0595
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer2 | Created | 51, Customer51, 1000.0, 1234616, 0545 5983 8318 5667
Producer | Sending | - | 89, Customer89, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 51, Customer51, 1000.0, 1234616, 0545 5983 8318 5667
Producer | Sending | - | 94, Customer94, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 89, Customer89, 1000.0, 1234587, 0524 4072 7723 7401
Consumer | Consumer1 | Existing | 94, Customer94, 1000.0, 1234594, 0089 8377 4312 8056
Producer | Sending | - | 88, Customer88, 1000.0, NA, NA
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 88, Customer88, 1000.0, 1234575, 7168 0720 5033 7590
Producer | Sending | - | 10, Customer10, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 14, Customer14, 1000.0, 1234597, 2542 2660 1958 0525
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer2 | Created | 10, Customer10, 1000.0, 1234617, 6849 9418 8789 7925
Consumer | Consumer1 | Existing | 26, Customer26, 1000.0, 1234586, 0667 8817 2441 0981
Producer | Sending | - | 4, Customer4, 1000.0, NA, NA
Producer | Sending | - | 73, Customer73, 1000.0, NA, NA
Consumer | Consumer2 | Created | 4, Customer4, 1000.0, 1234618, 9503 3331 0222 3426
Producer | Sending | - | 77, Customer77, 1000.0, NA, NA
Consumer | Consumer1 | Created | 73, Customer73, 1000.0, 1234619, 6329 9638 2576 5660
Consumer | Consumer2 | Created | 77, Customer77, 1000.0, 1234620, 3014 4183 5141 4044
Producer | Sending | - | 2, Customer2, 1000.0, NA, NA
Consumer | Consumer1 | Created | 2, Customer2, 1000.0, 1234621, 6230 3411 3670 5172
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Producer | Sending | - | 84, Customer84, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 59, Customer59, 1000.0, 1234600, 9144 5434 1089 5674
Producer | Sending | - | 96, Customer96, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 84, Customer84, 1000.0, 1234572, 4585 5633 7244 6746
Consumer | Consumer2 | Existing | 96, Customer96, 1000.0, 1234611, 3347 4815 5352 0776
Producer | Sending | - | 52, Customer52, 1000.0, NA, NA
Producer | Sending | - | 46, Customer46, 1000.0, NA, NA
Consumer | Consumer1 | Created | 52, Customer52, 1000.0, 1234622, 4100 6357 2215 5273
Consumer | Consumer2 | Existing | 46, Customer46, 1000.0, 1234596, 8078 1268 8691 5128
Producer | Sending | - | 19, Customer19, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 19, Customer19, 1000.0, 1234591, 6725 9654 8938 5328
Producer | Sending | - | 87, Customer87, 1000.0, NA, NA
Consumer | Consumer2 | Created | 87, Customer87, 1000.0, 1234623, 6801 6198 3558 2092
Producer | Sending | - | 73, Customer73, 1000.0, NA, NA
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 73, Customer73, 1000.0, 1234619, 6329 9638 2576 5660
Consumer | Consumer2 | Created | 11, Customer11, 1000.0, 1234624, 4025 6177 5765 6802
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 92, Customer92, 1000.0, 1234614, 2241 3719 8251 8996
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Producer | Sending | - | 27, Customer27, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 40, Customer40, 1000.0, 1234593, 0915 5360 7351 3040
Producer | Sending | - | 64, Customer64, 1000.0, NA, NA
Consumer | Consumer1 | Created | 27, Customer27, 1000.0, 1234625, 9855 9364 3717 0175
Consumer | Consumer2 | Existing | 64, Customer64, 1000.0, 1234590, 2323 4946 2381 3735
Producer | Sending | - | 74, Customer74, 1000.0, NA, NA
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer1 | Created | 74, Customer74, 1000.0, 1234626, 4412 1433 5092 9344
Producer | Sending | - | 63, Customer63, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 51, Customer51, 1000.0, 1234616, 0545 5983 8318 5667
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 63, Customer63, 1000.0, 1234595, 5836 8026 5689 6772
Producer | Sending | - | 7, Customer7, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 92, Customer92, 1000.0, 1234614, 2241 3719 8251 8996
Producer | Sending | - | 41, Customer41, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 7, Customer7, 1000.0, 1234579, 8134 8895 6595 6592
Consumer | Consumer2 | Existing | 41, Customer41, 1000.0, 1234609, 8759 1100 2823 3202
Producer | Sending | - | 55, Customer55, 1000.0, NA, NA
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Created | 55, Customer55, 1000.0, 1234627, 4436 6579 6570 7516
Consumer | Consumer2 | Existing | 11, Customer11, 1000.0, 1234624, 4025 6177 5765 6802
Producer | Sending | - | 19, Customer19, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 19, Customer19, 1000.0, 1234591, 6725 9654 8938 5328
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 81, Customer81, 1000.0, 1234580, 9401 8500 3087 5240
Producer | Sending | - | 32, Customer32, 1000.0, NA, NA
Consumer | Consumer1 | Created | 32, Customer32, 1000.0, 1234628, 3411 6886 3541 4106
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 14, Customer14, 1000.0, 1234597, 2542 2660 1958 0525
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 26, Customer26, 1000.0, 1234586, 0667 8817 2441 0981
Producer | Sending | - | 90, Customer90, 1000.0, NA, NA
Consumer | Consumer2 | Created | 90, Customer90, 1000.0, 1234629, 4301 6639 9520 5339
Producer | Sending | - | 5, Customer5, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 5, Customer5, 1000.0, 1234568, 8932 9631 7423 7836
Producer | Sending | - | 69, Customer69, 1000.0, NA, NA
=================================================================================================================
Consumer | Consumer2 | Existing | 69, Customer69, 1000.0, 1234576, 7773 0871 6479 7101
Source | No Of Created | No Of Existing | No Of Processed
====================================================================================
Consumer1 | 32 | 18 | 50
Consumer2 | 31 | 19 | 50
------------------------------------------------------------------------------------
BankAccountProcessingQueue | 63 | 37 | 100
====================================================================================
By looking at the summary, it is clear that the message objects are distributed and load balanced between both of the consumers (consumer 1 and consumer 2). We can see that both of the consumers process half (50) bank accounts. I mean, messages are given to the consumer available in a round-robin way. Consumer 1 found 18 existing and consumer 2 found 19 existing accounts.
Example of Bank Account Create Using JMS-ActiveMQ Topics
Now, let's take the same example to understand Topics.
Code for MyTopic class:
package org.trishinfotech.activemq.example4;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.trishinfotech.activemq.example2.BankAccount;
public class MyTopic {
private static final String CLIENTID = "TrishInfotechActiveMQ";
private String topicName;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Topic topic;
private MessageProducer producer;
private List<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
private Map<Long, BankAccount> accountMap = new HashMap<Long, BankAccount>();
private int noOfSend = 0;
public MyTopic(String topicName, int noOfConsumers) throws Exception {
super();
// The name of the topic.
this.topicName = topicName;
// URL of the JMS server is required to create connection factory.
// DEFAULT_BROKER_URL is : tcp://localhost:61616 and is indicates that JMS
// server is running on localhost
connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// Getting JMS connection from the server and starting it
connection = connectionFactory.createConnection("admin", "admin");
connection.setClientID(CLIENTID);
connection.start();
// Creating a non-transactional session to send/receive JMS message.
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Topic represents here our Topic ’BankAccountProcessingTopic’ on the JMS
// server.
// The Topic will be created automatically on the JSM server if its not already
// created.
topic = session.createTopic(this.topicName);
// MessageProducer is used for sending (producing) messages to the Topic.
producer = session.createProducer(topic);
// MessageConsumer is used for receiving (consuming) messages from the Topic.
IntStream.range(0, noOfConsumers).forEach(consumerNo -> {
try {
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MyTopicListener(this, "Consumer" + (consumerNo + 1)));
consumers.add(consumer);
} catch (Exception exp) {
// we can ignore as of now
}
});
}
public void sendAccountToTopic(MessageProducer producer, BankAccount newAccount) throws Exception {
System.out.printf("%10s | %10s | %10s | %50s\n", "Producer", "Sending", "-", newAccount);
ObjectMessage message = session.createObjectMessage(newAccount);
// push the message into Topic
producer.send(message);
noOfSend++;
}
public ProcessStatus processAccountFromTopic(Message message, String consumerTaskName) throws Exception {
ProcessStatus status = ProcessStatus.TOPIC_CLOSED;
BankAccount account = null;
if (message != null) {
ObjectMessage objectMessage = (ObjectMessage) message;
account = (BankAccount) objectMessage.getObject();
if (account != null) {
BankAccount existingAccount = pullAccountByApplicationNo(account.getApplicationNo());
if (existingAccount != null) {
System.out.printf("%10s | %10s | %10s | %50s\n", "Consumer", consumerTaskName, "Existing",
existingAccount);
account = existingAccount;
status = ProcessStatus.EXISTING;
} else {
createBankAccount(account);
System.out.printf("%10s | %10s | %10s | %50s\n", "Consumer", consumerTaskName, "Created", account);
status = ProcessStatus.CREATED;
}
}
//message.acknowledge();
}
return status;
}
private void createBankAccount(BankAccount account) throws Exception {
Helper.setupBankAccount(account);
accountMap.put(account.getApplicationNo(), account);
}
public BankAccount pullAccountByApplicationNo(long applicationNo) {
return accountMap.get(applicationNo);
}
public void close() throws JMSException {
producer.close();
producer = null;
consumers.stream().forEach(consumer -> {
try {
consumer.close();
} catch (JMSException e) {
// we can ignore as of now
}
});
consumers.clear();
session.close();
session = null;
connection.close();
connection = null;
}
public String getTopicName() {
return topicName;
}
public Connection getConnection() {
return connection;
}
public Session getSession() {
return session;
}
public Topic getTopic() {
return topic;
}
public MessageProducer getProducer() {
return producer;
}
public void printSummary() {
System.out.printf("\n\n%30s | %14s | %14s | %14s\n", "Source", "No Of Created", "No Of Existing",
"No Of Processed");
System.out.println("====================================================================================");
AtomicInteger totalNoOfExisting = new AtomicInteger();
consumers.stream().forEach(consumer -> {
try {
MyTopicListener listener = (MyTopicListener) consumer.getMessageListener();
listener.printSummary();
totalNoOfExisting.getAndAdd(listener.getNoOfExisting());
} catch (JMSException exp) {
// we can ignore as of now
exp.printStackTrace();
}
});
System.out.println("------------------------------------------------------------------------------------");
System.out.printf("%30s | %14s | %14s | %14s\n", topicName, accountMap.size(), totalNoOfExisting.get(),
noOfSend);
System.out.println("====================================================================================");
}
private static enum ProcessStatus {
CREATED, EXISTING, TOPIC_CLOSED
}
private static class Helper {
private static long initialCustomerId = 1234567l;
private static long accountCreated = 0l;
public static void setupBankAccount(BankAccount account) {
long nextAvailableCustomerId = initialCustomerId + accountCreated++;
Random rand = new Random();
String atmCardNumber = String.format("%04d %04d %04d %04d", rand.nextInt(9999), rand.nextInt(9999),
rand.nextInt(9999), rand.nextInt(9999));
account.setCustomerId(nextAvailableCustomerId);
account.setAtmCardNumber(atmCardNumber);
}
}
private static class MyTopicListener implements MessageListener {
private MyTopic Topic;
private String consumerTaskName;
private int noOfProcessed = 0, noOfCreated = 0, noOfExisting = 0;
public MyTopicListener(final MyTopic Topic, String consumerTaskName) {
super();
this.Topic = Topic;
this.consumerTaskName = consumerTaskName;
}
@Override
public void onMessage(Message message) {
// on poll the message from the Topic
try {
ProcessStatus status = Topic.processAccountFromTopic(message, consumerTaskName);
switch (status) {
case CREATED:
noOfCreated++;
break;
case EXISTING:
noOfExisting++;
break;
case TOPIC_CLOSED:
default:
}
if (!ProcessStatus.TOPIC_CLOSED.equals(status)) {
noOfProcessed++;
}
} catch (Exception exp) {
// we can ignore as of now
}
}
public int getNoOfProcessed() {
return noOfProcessed;
}
public int getNoOfCreated() {
return noOfCreated;
}
public int getNoOfExisting() {
return noOfExisting;
}
public void printSummary() {
System.out.printf("%30s | %14s | %14s | %14s\n", consumerTaskName, getNoOfCreated(), getNoOfExisting(),
getNoOfProcessed());
}
}
}
Now let's write the Main class for testing the Topic using two consumers.
package org.trishinfotech.activemq.example4;
import java.util.Random;
import org.trishinfotech.activemq.example2.BankAccount;
public class Main {
private static final String TOPIC_NAME = "BankAccountProcessingTopic";
private static final int NO_OF_CONSUMERS = 2;
private static final long NO_OF_ACCOUNTS = 100l;
public static void main(String[] args) throws Exception {
MyTopic topic = new MyTopic(TOPIC_NAME, NO_OF_CONSUMERS);
Random rand = new Random();
System.out.printf("%10s | %10s | %10s | %50s\n", "Source", "Action", "Result",
"Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)");
System.out.println(
"=================================================================================================================");
for (long i = 1; i <= NO_OF_ACCOUNTS; i++) {
long applicationNo = rand.nextLong(NO_OF_ACCOUNTS);
BankAccount newAccount = new BankAccount(applicationNo, "Customer" + applicationNo, 1000.0d);
topic.sendAccountToTopic(topic.getProducer(), newAccount);
}
System.out.println(
"=================================================================================================================");
// just to give graceful time to finish the processing
Thread.sleep(2000);
topic.printSummary();
topic.close();
}
}
Below is the output of the program:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Source | Action | Result | Bank Details (ApplicationNo, UserName, DepositAmount, CustomerId, ATM)
=================================================================================================================
Producer | Sending | - | 54, Customer54, 1000.0, NA, NA
Producer | Sending | - | 12, Customer12, 1000.0, NA, NA
Producer | Sending | - | 29, Customer29, 1000.0, NA, NA
Consumer | Consumer1 | Created | 54, Customer54, 1000.0, 1234567, 9985 9574 4311 2198
Consumer | Consumer2 | Existing | 54, Customer54, 1000.0, 1234567, 9985 9574 4311 2198
Consumer | Consumer1 | Created | 12, Customer12, 1000.0, 1234568, 4620 8215 3432 7416
Consumer | Consumer2 | Existing | 12, Customer12, 1000.0, 1234568, 4620 8215 3432 7416
Producer | Sending | - | 39, Customer39, 1000.0, NA, NA
Consumer | Consumer1 | Created | 29, Customer29, 1000.0, 1234569, 5922 0932 0661 7776
Consumer | Consumer2 | Existing | 29, Customer29, 1000.0, 1234569, 5922 0932 0661 7776
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Created | 39, Customer39, 1000.0, 1234570, 8979 1952 9741 8156
Consumer | Consumer2 | Existing | 39, Customer39, 1000.0, 1234570, 8979 1952 9741 8156
Producer | Sending | - | 58, Customer58, 1000.0, NA, NA
Consumer | Consumer1 | Created | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Consumer | Consumer2 | Existing | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Producer | Sending | - | 89, Customer89, 1000.0, NA, NA
Consumer | Consumer1 | Created | 58, Customer58, 1000.0, 1234572, 6129 1065 3050 1229
Consumer | Consumer2 | Existing | 58, Customer58, 1000.0, 1234572, 6129 1065 3050 1229
Producer | Sending | - | 75, Customer75, 1000.0, NA, NA
Consumer | Consumer1 | Created | 89, Customer89, 1000.0, 1234573, 8368 1109 3010 5504
Consumer | Consumer2 | Existing | 89, Customer89, 1000.0, 1234573, 8368 1109 3010 5504
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer1 | Created | 75, Customer75, 1000.0, 1234574, 7648 2459 6681 7525
Consumer | Consumer2 | Existing | 75, Customer75, 1000.0, 1234574, 7648 2459 6681 7525
Producer | Sending | - | 85, Customer85, 1000.0, NA, NA
Consumer | Consumer1 | Created | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Consumer | Consumer2 | Existing | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Producer | Sending | - | 32, Customer32, 1000.0, NA, NA
Consumer | Consumer1 | Created | 85, Customer85, 1000.0, 1234576, 0665 2762 6508 4120
Consumer | Consumer2 | Existing | 85, Customer85, 1000.0, 1234576, 0665 2762 6508 4120
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer1 | Created | 32, Customer32, 1000.0, 1234577, 2570 2052 9064 7359
Consumer | Consumer2 | Existing | 32, Customer32, 1000.0, 1234577, 2570 2052 9064 7359
Producer | Sending | - | 96, Customer96, 1000.0, NA, NA
Consumer | Consumer1 | Created | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Consumer | Consumer2 | Existing | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Consumer | Consumer1 | Created | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Consumer | Consumer2 | Existing | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Producer | Sending | - | 50, Customer50, 1000.0, NA, NA
Consumer | Consumer1 | Created | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Consumer | Consumer2 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Producer | Sending | - | 17, Customer17, 1000.0, NA, NA
Consumer | Consumer1 | Created | 50, Customer50, 1000.0, 1234581, 4197 2447 7607 0203
Consumer | Consumer2 | Existing | 50, Customer50, 1000.0, 1234581, 4197 2447 7607 0203
Producer | Sending | - | 63, Customer63, 1000.0, NA, NA
Consumer | Consumer1 | Created | 17, Customer17, 1000.0, 1234582, 7208 5260 8319 9921
Consumer | Consumer2 | Existing | 17, Customer17, 1000.0, 1234582, 7208 5260 8319 9921
Producer | Sending | - | 48, Customer48, 1000.0, NA, NA
Consumer | Consumer1 | Created | 63, Customer63, 1000.0, 1234583, 6981 8342 8151 7148
Consumer | Consumer2 | Existing | 63, Customer63, 1000.0, 1234583, 6981 8342 8151 7148
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Consumer | Consumer1 | Created | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Consumer | Consumer2 | Existing | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Producer | Sending | - | 46, Customer46, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Consumer | Consumer2 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Producer | Sending | - | 48, Customer48, 1000.0, NA, NA
Consumer | Consumer1 | Created | 46, Customer46, 1000.0, 1234585, 5640 3576 9688 8346
Consumer | Consumer2 | Existing | 46, Customer46, 1000.0, 1234585, 5640 3576 9688 8346
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Consumer | Consumer2 | Existing | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Producer | Sending | - | 97, Customer97, 1000.0, NA, NA
Consumer | Consumer1 | Created | 59, Customer59, 1000.0, 1234586, 9374 2232 2981 9636
Consumer | Consumer2 | Existing | 59, Customer59, 1000.0, 1234586, 9374 2232 2981 9636
Producer | Sending | - | 1, Customer1, 1000.0, NA, NA
Consumer | Consumer1 | Created | 97, Customer97, 1000.0, 1234587, 7328 2325 7060 3879
Consumer | Consumer2 | Existing | 97, Customer97, 1000.0, 1234587, 7328 2325 7060 3879
Producer | Sending | - | 96, Customer96, 1000.0, NA, NA
Consumer | Consumer1 | Created | 1, Customer1, 1000.0, 1234588, 1255 8517 9131 9310
Consumer | Consumer2 | Existing | 1, Customer1, 1000.0, 1234588, 1255 8517 9131 9310
Producer | Sending | - | 59, Customer59, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Consumer | Consumer2 | Existing | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Producer | Sending | - | 27, Customer27, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 59, Customer59, 1000.0, 1234586, 9374 2232 2981 9636
Consumer | Consumer2 | Existing | 59, Customer59, 1000.0, 1234586, 9374 2232 2981 9636
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer1 | Created | 27, Customer27, 1000.0, 1234589, 4708 8739 1158 0626
Consumer | Consumer2 | Existing | 27, Customer27, 1000.0, 1234589, 4708 8739 1158 0626
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Consumer | Consumer1 | Created | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Consumer | Consumer2 | Existing | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Producer | Sending | - | 10, Customer10, 1000.0, NA, NA
Consumer | Consumer1 | Created | 33, Customer33, 1000.0, 1234591, 4502 7287 6119 4023
Consumer | Consumer2 | Existing | 33, Customer33, 1000.0, 1234591, 4502 7287 6119 4023
Producer | Sending | - | 41, Customer41, 1000.0, NA, NA
Consumer | Consumer1 | Created | 10, Customer10, 1000.0, 1234592, 5521 8069 0357 4544
Consumer | Consumer2 | Existing | 10, Customer10, 1000.0, 1234592, 5521 8069 0357 4544
Producer | Sending | - | 6, Customer6, 1000.0, NA, NA
Consumer | Consumer1 | Created | 41, Customer41, 1000.0, 1234593, 7326 6119 9606 1340
Consumer | Consumer2 | Existing | 41, Customer41, 1000.0, 1234593, 7326 6119 9606 1340
Producer | Sending | - | 15, Customer15, 1000.0, NA, NA
Consumer | Consumer1 | Created | 6, Customer6, 1000.0, 1234594, 8211 9638 4182 4603
Consumer | Consumer2 | Existing | 6, Customer6, 1000.0, 1234594, 8211 9638 4182 4603
Producer | Sending | - | 50, Customer50, 1000.0, NA, NA
Consumer | Consumer1 | Created | 15, Customer15, 1000.0, 1234595, 0027 6195 8152 0514
Consumer | Consumer2 | Existing | 15, Customer15, 1000.0, 1234595, 0027 6195 8152 0514
Producer | Sending | - | 77, Customer77, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 50, Customer50, 1000.0, 1234581, 4197 2447 7607 0203
Consumer | Consumer2 | Existing | 50, Customer50, 1000.0, 1234581, 4197 2447 7607 0203
Producer | Sending | - | 20, Customer20, 1000.0, NA, NA
Consumer | Consumer1 | Created | 77, Customer77, 1000.0, 1234596, 7076 0901 7064 5405
Consumer | Consumer2 | Existing | 77, Customer77, 1000.0, 1234596, 7076 0901 7064 5405
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer1 | Created | 20, Customer20, 1000.0, 1234597, 0949 6137 1796 5105
Consumer | Consumer2 | Existing | 20, Customer20, 1000.0, 1234597, 0949 6137 1796 5105
Consumer | Consumer1 | Existing | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Consumer | Consumer2 | Existing | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Producer | Sending | - | 80, Customer80, 1000.0, NA, NA
Producer | Sending | - | 21, Customer21, 1000.0, NA, NA
Consumer | Consumer1 | Created | 80, Customer80, 1000.0, 1234598, 3268 6602 3402 0714
Consumer | Consumer2 | Existing | 80, Customer80, 1000.0, 1234598, 3268 6602 3402 0714
Producer | Sending | - | 33, Customer33, 1000.0, NA, NA
Consumer | Consumer1 | Created | 21, Customer21, 1000.0, 1234599, 5988 5609 7385 3573
Consumer | Consumer2 | Existing | 21, Customer21, 1000.0, 1234599, 5988 5609 7385 3573
Producer | Sending | - | 99, Customer99, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 33, Customer33, 1000.0, 1234591, 4502 7287 6119 4023
Consumer | Consumer2 | Existing | 33, Customer33, 1000.0, 1234591, 4502 7287 6119 4023
Producer | Sending | - | 93, Customer93, 1000.0, NA, NA
Consumer | Consumer1 | Created | 99, Customer99, 1000.0, 1234600, 3956 3918 0942 1178
Consumer | Consumer2 | Existing | 99, Customer99, 1000.0, 1234600, 3956 3918 0942 1178
Producer | Sending | - | 87, Customer87, 1000.0, NA, NA
Consumer | Consumer1 | Created | 93, Customer93, 1000.0, 1234601, 2171 1058 8920 9367
Consumer | Consumer2 | Existing | 93, Customer93, 1000.0, 1234601, 2171 1058 8920 9367
Producer | Sending | - | 81, Customer81, 1000.0, NA, NA
Consumer | Consumer1 | Created | 87, Customer87, 1000.0, 1234602, 2680 8030 9886 9649
Consumer | Consumer2 | Existing | 87, Customer87, 1000.0, 1234602, 2680 8030 9886 9649
Producer | Sending | - | 24, Customer24, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Consumer | Consumer2 | Existing | 81, Customer81, 1000.0, 1234590, 3022 7767 4833 1489
Consumer | Consumer1 | Created | 24, Customer24, 1000.0, 1234603, 0102 7931 8314 1717
Consumer | Consumer2 | Existing | 24, Customer24, 1000.0, 1234603, 0102 7931 8314 1717
Producer | Sending | - | 61, Customer61, 1000.0, NA, NA
Producer | Sending | - | 10, Customer10, 1000.0, NA, NA
Consumer | Consumer1 | Created | 61, Customer61, 1000.0, 1234604, 3517 7947 6966 8926
Consumer | Consumer2 | Existing | 61, Customer61, 1000.0, 1234604, 3517 7947 6966 8926
Producer | Sending | - | 77, Customer77, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 10, Customer10, 1000.0, 1234592, 5521 8069 0357 4544
Consumer | Consumer2 | Existing | 10, Customer10, 1000.0, 1234592, 5521 8069 0357 4544
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 77, Customer77, 1000.0, 1234596, 7076 0901 7064 5405
Consumer | Consumer2 | Existing | 77, Customer77, 1000.0, 1234596, 7076 0901 7064 5405
Producer | Sending | - | 29, Customer29, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Consumer | Consumer2 | Existing | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Consumer | Consumer1 | Existing | 29, Customer29, 1000.0, 1234569, 5922 0932 0661 7776
Consumer | Consumer2 | Existing | 29, Customer29, 1000.0, 1234569, 5922 0932 0661 7776
Producer | Sending | - | 23, Customer23, 1000.0, NA, NA
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer1 | Created | 23, Customer23, 1000.0, 1234605, 2447 8878 0536 9002
Consumer | Consumer2 | Existing | 23, Customer23, 1000.0, 1234605, 2447 8878 0536 9002
Producer | Sending | - | 67, Customer67, 1000.0, NA, NA
Consumer | Consumer1 | Created | 51, Customer51, 1000.0, 1234606, 7570 2055 0623 9161
Consumer | Consumer2 | Existing | 51, Customer51, 1000.0, 1234606, 7570 2055 0623 9161
Producer | Sending | - | 18, Customer18, 1000.0, NA, NA
Consumer | Consumer1 | Created | 67, Customer67, 1000.0, 1234607, 9639 3829 4177 1339
Consumer | Consumer2 | Existing | 67, Customer67, 1000.0, 1234607, 9639 3829 4177 1339
Producer | Sending | - | 75, Customer75, 1000.0, NA, NA
Consumer | Consumer1 | Created | 18, Customer18, 1000.0, 1234608, 3847 5912 9846 2385
Consumer | Consumer2 | Existing | 18, Customer18, 1000.0, 1234608, 3847 5912 9846 2385
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 75, Customer75, 1000.0, 1234574, 7648 2459 6681 7525
Consumer | Consumer2 | Existing | 75, Customer75, 1000.0, 1234574, 7648 2459 6681 7525
Consumer | Consumer1 | Existing | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Producer | Sending | - | 44, Customer44, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Producer | Sending | - | 99, Customer99, 1000.0, NA, NA
Consumer | Consumer1 | Created | 44, Customer44, 1000.0, 1234609, 4502 9385 5323 2370
Consumer | Consumer2 | Existing | 44, Customer44, 1000.0, 1234609, 4502 9385 5323 2370
Consumer | Consumer1 | Existing | 99, Customer99, 1000.0, 1234600, 3956 3918 0942 1178
Consumer | Consumer2 | Existing | 99, Customer99, 1000.0, 1234600, 3956 3918 0942 1178
Producer | Sending | - | 54, Customer54, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 54, Customer54, 1000.0, 1234567, 9985 9574 4311 2198
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 54, Customer54, 1000.0, 1234567, 9985 9574 4311 2198
Consumer | Consumer1 | Existing | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Consumer | Consumer2 | Existing | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Producer | Sending | - | 69, Customer69, 1000.0, NA, NA
Consumer | Consumer1 | Created | 69, Customer69, 1000.0, 1234610, 5506 1245 2157 8924
Consumer | Consumer2 | Existing | 69, Customer69, 1000.0, 1234610, 5506 1245 2157 8924
Producer | Sending | - | 69, Customer69, 1000.0, NA, NA
Producer | Sending | - | 23, Customer23, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 69, Customer69, 1000.0, 1234610, 5506 1245 2157 8924
Consumer | Consumer2 | Existing | 69, Customer69, 1000.0, 1234610, 5506 1245 2157 8924
Consumer | Consumer1 | Existing | 23, Customer23, 1000.0, 1234605, 2447 8878 0536 9002
Consumer | Consumer2 | Existing | 23, Customer23, 1000.0, 1234605, 2447 8878 0536 9002
Producer | Sending | - | 28, Customer28, 1000.0, NA, NA
Consumer | Consumer1 | Created | 28, Customer28, 1000.0, 1234611, 3608 4851 2014 0529
Consumer | Consumer2 | Existing | 28, Customer28, 1000.0, 1234611, 3608 4851 2014 0529
Producer | Sending | - | 26, Customer26, 1000.0, NA, NA
Consumer | Consumer1 | Created | 26, Customer26, 1000.0, 1234612, 7383 9776 4859 7699
Producer | Sending | - | 82, Customer82, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 26, Customer26, 1000.0, 1234612, 7383 9776 4859 7699
Producer | Sending | - | 0, Customer0, 1000.0, NA, NA
Consumer | Consumer1 | Created | 82, Customer82, 1000.0, 1234613, 1866 4968 9823 8567
Consumer | Consumer2 | Existing | 82, Customer82, 1000.0, 1234613, 1866 4968 9823 8567
Consumer | Consumer1 | Created | 0, Customer0, 1000.0, 1234614, 5154 0538 8836 1405
Producer | Sending | - | 98, Customer98, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 0, Customer0, 1000.0, 1234614, 5154 0538 8836 1405
Producer | Sending | - | 45, Customer45, 1000.0, NA, NA
Consumer | Consumer1 | Created | 98, Customer98, 1000.0, 1234615, 3103 8736 3717 1799
Consumer | Consumer2 | Existing | 98, Customer98, 1000.0, 1234615, 3103 8736 3717 1799
Consumer | Consumer1 | Created | 45, Customer45, 1000.0, 1234616, 5263 4317 3594 3483
Consumer | Consumer2 | Existing | 45, Customer45, 1000.0, 1234616, 5263 4317 3594 3483
Producer | Sending | - | 32, Customer32, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 32, Customer32, 1000.0, 1234577, 2570 2052 9064 7359
Consumer | Consumer2 | Existing | 32, Customer32, 1000.0, 1234577, 2570 2052 9064 7359
Producer | Sending | - | 48, Customer48, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Producer | Sending | - | 20, Customer20, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 48, Customer48, 1000.0, 1234584, 8918 9226 2331 3452
Producer | Sending | - | 94, Customer94, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 20, Customer20, 1000.0, 1234597, 0949 6137 1796 5105
Consumer | Consumer2 | Existing | 20, Customer20, 1000.0, 1234597, 0949 6137 1796 5105
Consumer | Consumer1 | Created | 94, Customer94, 1000.0, 1234617, 4437 8141 3477 3400
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 94, Customer94, 1000.0, 1234617, 4437 8141 3477 3400
Consumer | Consumer1 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Producer | Sending | - | 8, Customer8, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Producer | Sending | - | 36, Customer36, 1000.0, NA, NA
Consumer | Consumer1 | Created | 8, Customer8, 1000.0, 1234618, 3298 7450 8562 7683
Consumer | Consumer2 | Existing | 8, Customer8, 1000.0, 1234618, 3298 7450 8562 7683
Consumer | Consumer1 | Created | 36, Customer36, 1000.0, 1234619, 2128 7607 6754 2605
Producer | Sending | - | 93, Customer93, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 36, Customer36, 1000.0, 1234619, 2128 7607 6754 2605
Producer | Sending | - | 65, Customer65, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 93, Customer93, 1000.0, 1234601, 2171 1058 8920 9367
Consumer | Consumer2 | Existing | 93, Customer93, 1000.0, 1234601, 2171 1058 8920 9367
Consumer | Consumer1 | Created | 65, Customer65, 1000.0, 1234620, 2332 8853 2627 1210
Consumer | Consumer2 | Existing | 65, Customer65, 1000.0, 1234620, 2332 8853 2627 1210
Producer | Sending | - | 37, Customer37, 1000.0, NA, NA
Consumer | Consumer1 | Created | 37, Customer37, 1000.0, 1234621, 8888 0443 3174 3646
Consumer | Consumer2 | Existing | 37, Customer37, 1000.0, 1234621, 8888 0443 3174 3646
Producer | Sending | - | 14, Customer14, 1000.0, NA, NA
Producer | Sending | - | 16, Customer16, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Consumer | Consumer2 | Existing | 14, Customer14, 1000.0, 1234575, 5687 8278 6956 4624
Producer | Sending | - | 4, Customer4, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Consumer | Consumer2 | Existing | 16, Customer16, 1000.0, 1234580, 9969 2653 6614 1189
Producer | Sending | - | 46, Customer46, 1000.0, NA, NA
Consumer | Consumer1 | Created | 4, Customer4, 1000.0, 1234622, 0112 0942 7814 5289
Consumer | Consumer2 | Existing | 4, Customer4, 1000.0, 1234622, 0112 0942 7814 5289
Producer | Sending | - | 76, Customer76, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 46, Customer46, 1000.0, 1234585, 5640 3576 9688 8346
Consumer | Consumer2 | Existing | 46, Customer46, 1000.0, 1234585, 5640 3576 9688 8346
Consumer | Consumer1 | Created | 76, Customer76, 1000.0, 1234623, 8566 2644 8548 5091
Producer | Sending | - | 51, Customer51, 1000.0, NA, NA
Consumer | Consumer2 | Existing | 76, Customer76, 1000.0, 1234623, 8566 2644 8548 5091
Producer | Sending | - | 28, Customer28, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 51, Customer51, 1000.0, 1234606, 7570 2055 0623 9161
Consumer | Consumer2 | Existing | 51, Customer51, 1000.0, 1234606, 7570 2055 0623 9161
Producer | Sending | - | 40, Customer40, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 28, Customer28, 1000.0, 1234611, 3608 4851 2014 0529
Consumer | Consumer2 | Existing | 28, Customer28, 1000.0, 1234611, 3608 4851 2014 0529
Producer | Sending | - | 91, Customer91, 1000.0, NA, NA
Consumer | Consumer1 | Created | 40, Customer40, 1000.0, 1234624, 9799 2739 5352 2244
Consumer | Consumer2 | Existing | 40, Customer40, 1000.0, 1234624, 9799 2739 5352 2244
Producer | Sending | - | 42, Customer42, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Consumer | Consumer2 | Existing | 91, Customer91, 1000.0, 1234578, 0442 1979 9308 0532
Consumer | Consumer1 | Created | 42, Customer42, 1000.0, 1234625, 4248 5736 9790 4421
Consumer | Consumer2 | Existing | 42, Customer42, 1000.0, 1234625, 4248 5736 9790 4421
Producer | Sending | - | 17, Customer17, 1000.0, NA, NA
Producer | Sending | - | 11, Customer11, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 17, Customer17, 1000.0, 1234582, 7208 5260 8319 9921
Consumer | Consumer2 | Existing | 17, Customer17, 1000.0, 1234582, 7208 5260 8319 9921
Producer | Sending | - | 2, Customer2, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Consumer | Consumer2 | Existing | 11, Customer11, 1000.0, 1234571, 3935 5490 1231 2701
Producer | Sending | - | 35, Customer35, 1000.0, NA, NA
Consumer | Consumer1 | Created | 2, Customer2, 1000.0, 1234626, 5015 6813 2053 3076
Consumer | Consumer2 | Existing | 2, Customer2, 1000.0, 1234626, 5015 6813 2053 3076
Producer | Sending | - | 98, Customer98, 1000.0, NA, NA
Consumer | Consumer1 | Created | 35, Customer35, 1000.0, 1234627, 3489 7467 9062 2022
Consumer | Consumer2 | Existing | 35, Customer35, 1000.0, 1234627, 3489 7467 9062 2022
Producer | Sending | - | 96, Customer96, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 98, Customer98, 1000.0, 1234615, 3103 8736 3717 1799
Consumer | Consumer2 | Existing | 98, Customer98, 1000.0, 1234615, 3103 8736 3717 1799
Producer | Sending | - | 55, Customer55, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Consumer | Consumer2 | Existing | 96, Customer96, 1000.0, 1234579, 7625 9051 7947 8691
Producer | Sending | - | 86, Customer86, 1000.0, NA, NA
Consumer | Consumer1 | Created | 55, Customer55, 1000.0, 1234628, 0035 0177 7661 3333
Consumer | Consumer2 | Existing | 55, Customer55, 1000.0, 1234628, 0035 0177 7661 3333
Producer | Sending | - | 5, Customer5, 1000.0, NA, NA
Consumer | Consumer1 | Created | 86, Customer86, 1000.0, 1234629, 5593 1226 6727 7457
Consumer | Consumer2 | Existing | 86, Customer86, 1000.0, 1234629, 5593 1226 6727 7457
Producer | Sending | - | 42, Customer42, 1000.0, NA, NA
Consumer | Consumer1 | Created | 5, Customer5, 1000.0, 1234630, 1917 3681 1005 8888
Consumer | Consumer2 | Existing | 5, Customer5, 1000.0, 1234630, 1917 3681 1005 8888
Producer | Sending | - | 92, Customer92, 1000.0, NA, NA
Consumer | Consumer1 | Existing | 42, Customer42, 1000.0, 1234625, 4248 5736 9790 4421
Consumer | Consumer2 | Existing | 42, Customer42, 1000.0, 1234625, 4248 5736 9790 4421
=================================================================================================================
Consumer | Consumer1 | Created | 92, Customer92, 1000.0, 1234631, 1056 5469 4506 1201
Consumer | Consumer2 | Existing | 92, Customer92, 1000.0, 1234631, 1056 5469 4506 1201
Source | No Of Created | No Of Existing | No Of Processed
====================================================================================
Consumer1 | 65 | 35 | 100
Consumer2 | 0 | 100 | 100
------------------------------------------------------------------------------------
BankAccountProcessingTopic | 65 | 135 | 100
====================================================================================
Notice the change in the output? Messages are broadcasted to all of the subscribed consumers of the topic. So, both consumers (consumer 1 and consumer 2) received all the messages. Consumer 1 found 35 duplicate application numbers and the rest of the 65 accounts it has created. But, Consumer 2 found all duplicates? Hmm, why? because it received the message after consumer 1 (as per the subscription order), and by then, it was either duplicated or created by Consumer 1.
Working With Multiple Queues
Well, this is a bit of a complicated topic, and there are different ways to deal with it. To help new developers, I am trying a simple approach to present here.
Let's write a class for some CalculationWork as below:
package org.trishinfotech.activemq.example5;
import java.io.Serializable;
public class CalculationWork implements Serializable {
private static final String COMMA_STR = ",";
private static final long serialVersionUID = 710578808598511209L;
protected String calculateName;
protected String value;
public CalculationWork(String calculateName, String value) {
super();
this.calculateName = calculateName;
this.value = value;
}
public String getCalculateName() {
return calculateName;
}
public void setCalculateName(String calculateName) {
this.calculateName = calculateName;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("[calculateName=").append(calculateName).append(", value=").append(value)
.append("]");
return builder.toString();
}
public static CalculationWork valueOf(String line) {
CalculationWork calculationWork = null;
if (line != null) {
String[] words = line.split(COMMA_STR);
if (words.length >= 2) {
calculationWork = new CalculationWork(words[0].trim(), words[1].trim());
}
}
return calculationWork;
}
}
Now, create an abstract class MyQueue to have a common code of a queue, like setting up (via constructor in this case) and sending calculation-work message objects to the queue. Here I created an abstract method processCalculationWorkFromQueue()
to be implemented by its subclasses.
package org.trishinfotech.activemq.example5;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public abstract class MyQueue {
protected String calculateName;
protected String queueName;
protected ConnectionFactory connectionFactory;
protected Connection connection;
protected Session session;
protected Destination destination;
protected MessageProducer producer;
protected MessageConsumer consumer;
protected int noOfSend = 0;
public MyQueue(String calculateName, String queueName) throws Exception {
super();
this.calculateName = calculateName;
// The name of the queue.
this.queueName = queueName;
// URL of the JMS server is required to create connection factory.
// DEFAULT_BROKER_URL is : tcp://localhost:61616 and is indicates that JMS
// server is running on localhost
connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// Getting JMS connection from the server and starting it
connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Creating a non-transactional session to send/receive JMS message.
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Destination represents here our queue ’BankAccountProcessingQueue’ on the JMS
// server.
// The queue will be created automatically on the JSM server if its not already
// created.
destination = session.createQueue(this.queueName);
// MessageProducer is used for sending (producing) messages to the queue.
producer = session.createProducer(destination);
// MessageConsumer is used for receiving (consuming) messages from the queue.
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyQueueListener(this, queueName + "Consumer"));
}
public void close() throws JMSException {
producer.close();
producer = null;
consumer.close();
session.close();
session = null;
connection.close();
connection = null;
}
public void sendCalculationWorkToQueue(CalculationWork calculationWork) throws Exception {
System.out.printf("%40s | %10s | %-50s\n", queueName + "Producer", "Sending", calculationWork);
ObjectMessage message = session.createObjectMessage(calculationWork);
// push the message into queue
producer.send(message);
noOfSend++;
}
public String getCalculateName() {
return calculateName;
}
public String getQueueName() {
return queueName;
}
public Connection getConnection() {
return connection;
}
public Session getSession() {
return session;
}
public Destination getDestination() {
return destination;
}
public MessageProducer getProducer() {
return producer;
}
public abstract String processCalculationWorkFromQueue(Message message, String consumerTaskName) throws Exception;
private static class MyQueueListener implements MessageListener {
private MyQueue queue;
private String consumerTaskName;
public MyQueueListener(final MyQueue queue, String consumerTaskName) {
super();
this.queue = queue;
this.consumerTaskName = consumerTaskName;
}
@Override
public void onMessage(Message message) {
// on poll the message from the queue
try {
ObjectMessage objectMessage = (ObjectMessage) message;
CalculationWork calculationWork = (CalculationWork) objectMessage.getObject();
String answer = queue.processCalculationWorkFromQueue(message, consumerTaskName);
System.out.printf("%40s | %10s | %-50s \n", consumerTaskName, queue.getCalculateName(), calculationWork + " : " +answer);
} catch (Exception exp) {
// we can ignore as of now
}
}
}
}
Now, let's write some subclasses of MyQueue to perform various types of calculations.
Code for ArmstrongQueue class:
package org.trishinfotech.activemq.example5;
import javax.jms.Message;
import javax.jms.ObjectMessage;
public class ArmstrongQueue extends MyQueue {
public ArmstrongQueue(String calculateName, String queueName) throws Exception {
super(calculateName, queueName);
}
@Override
public String processCalculationWorkFromQueue(Message message, String consumerTaskName) throws Exception {
String answer = "false";
CalculationWork calculationWork = null;
if (message != null) {
ObjectMessage objectMessage = (ObjectMessage) message;
calculationWork = (CalculationWork) objectMessage.getObject();
if (calculationWork != null) {
String value = calculationWork.getValue();
try {
long longValue = Long.parseLong(value);
long number = longValue;
long armstrongValue = 0;
while (number != 0) {
long temp = number % 10;
armstrongValue = armstrongValue + temp * temp * temp;
number /= 10;
}
answer = Boolean.toString(String.valueOf(armstrongValue).equals(value));
} catch (NumberFormatException exp) {
System.out.println("Can't calculate armstrong of " + value);
}
}
message.acknowledge();
}
return answer;
}
}
Code for FactorialQueue class:
package org.trishinfotech.activemq.example5;
import java.math.BigInteger;
import javax.jms.Message;
import javax.jms.ObjectMessage;
public class FactorialQueue extends MyQueue {
public FactorialQueue(String calculateName, String queueName) throws Exception {
super(calculateName, queueName);
}
@Override
public String processCalculationWorkFromQueue(Message message, String consumerTaskName) throws Exception {
String answer = "NA";
if (message != null) {
ObjectMessage objectMessage = (ObjectMessage) message;
CalculationWork calculationWork = (CalculationWork) objectMessage.getObject();
if (calculationWork != null) {
String value = calculationWork.getValue();
try {
long longValue = Long.parseLong(value);
BigInteger factorialValue = BigInteger.valueOf(1);
for (long i = 1; i <= longValue; i++) {
factorialValue = factorialValue.multiply(BigInteger.valueOf(i));
}
answer = factorialValue.toString();
} catch (NumberFormatException exp) {
System.out.println("Can't calculate factorial of " + value);
}
}
message.acknowledge();
}
return answer;
}
}
Code for PalindromeQueue class:
package org.trishinfotech.activemq.example5;
import javax.jms.Message;
import javax.jms.ObjectMessage;
public class PalindromeQueue extends MyQueue {
public PalindromeQueue(String calculateName, String queueName) throws Exception {
super(calculateName, queueName);
}
@Override
public String processCalculationWorkFromQueue(Message message, String consumerTaskName) throws Exception {
String answer = "false";
CalculationWork calculationWork = null;
if (message != null) {
ObjectMessage objectMessage = (ObjectMessage) message;
calculationWork = (CalculationWork) objectMessage.getObject();
if (calculationWork != null) {
String value = calculationWork.getValue();
if (value != null && !value.trim().isEmpty()) {
String reverse = (new StringBuilder(value).reverse().toString());
answer = Boolean.toString(reverse.equals(value));
}
}
message.acknowledge();
}
return answer;
}
}-
Now, Let's write a queue-manager MyQueueManager to maintain the queue objects and select the queue which is made for performing the calculation as per the calculation work we have. The queue manager has a map of queues. It uses the calculation name as the key of the map. Any calculation —work comes with a calculation name and is hence used to find the right queue from the map to send the work.
package org.trishinfotech.activemq.example5;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
public class MyQueueManager {
protected Map<String, MyQueue> queueMap = new HashMap<String, MyQueue>();
public void manageQueue(MyQueue myQueue) {
queueMap.put(myQueue.getCalculateName(), myQueue);
}
public void close() {
queueMap.values().forEach(myQueue -> {
try {
myQueue.close();
} catch (JMSException exp) {
// we can ignore as of now
}
});
}
public void sendToQueue(CalculationWork calculationWork) throws Exception {
MyQueue myQueue = findQueue(calculationWork);
if (myQueue != null) {
myQueue.sendCalculationWorkToQueue(calculationWork);
}
}
public MyQueue findQueue(CalculationWork calculationWork) {
return queueMap.get(calculationWork.getCalculateName());
}
}
Below is the comma-separated input file (CalculationWork.txt)
, which I will use to feed the queue for performing calculations.
Armstrong, 153
Factorial, 20
Palindrome, 1234321
Factorial, 43
Palindrome, 12341234
Palindrome, ABCDEDCBA
Armstrong, 8208
Palindrome, 4567887654
Armstrong, 2104
Factorial, 15
Armstrong, 4210818
Factorial, 120
Armstrong, 345321
Factorial, 543
Palindrome, XYZZYX
Factorial, 7
Now, here is the code for the Main class to read the file and process the calculations via a dedicated queue of the calculation type.
package org.trishinfotech.activemq.example5;
import java.io.BufferedReader;
import java.io.FileReader;
public class Main {
private static final String QUEUE_ARMSTRONG = "ArmstrongCalculationQueue";
private static final String QUEUE_FACTORIAL = "FactorialCalculationQueue";
private static final String QUEUE_PALINDROME = "PalindromeCalculationQueue";
public static void main(String[] args) throws Exception {
MyQueueManager queueManager = new MyQueueManager();
queueManager.manageQueue(new ArmstrongQueue("Armstrong", QUEUE_ARMSTRONG));
queueManager.manageQueue(new FactorialQueue("Factorial", QUEUE_FACTORIAL));
queueManager.manageQueue(new PalindromeQueue("Palindrome", QUEUE_PALINDROME));
System.out.printf("%40s | %10s | %-50s\n", "Source", "Action", "Result/Details");
System.out.println(
"=================================================================================================================");
BufferedReader reader = new BufferedReader(new FileReader("./CalculationWork.txt"));
String line;
while ((line = reader.readLine()) != null) {
CalculationWork calculationWork = CalculationWork.valueOf(line);
if (calculationWork != null) {
queueManager.sendToQueue(calculationWork);
}
}
reader.close();
// just to give graceful time to finish the processing
Thread.sleep(2000);
queueManager.close();
System.out.println(
"=================================================================================================================");
}
}
Below is the output of the program:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Source | Action | Result/Details
=================================================================================================================
ArmstrongCalculationQueueProducer | Sending | [calculateName=Armstrong, value=153]
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=20]
ArmstrongCalculationQueueConsumer | Armstrong | [calculateName=Armstrong, value=153] : true
PalindromeCalculationQueueProducer | Sending | [calculateName=Palindrome, value=1234321]
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=20] : 2432902008176640000
PalindromeCalculationQueueConsumer | Palindrome | [calculateName=Palindrome, value=1234321] : true
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=43]
PalindromeCalculationQueueProducer | Sending | [calculateName=Palindrome, value=12341234]
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=43] : 60415263063373835637355132068513997507264512000000000
PalindromeCalculationQueueProducer | Sending | [calculateName=Palindrome, value=ABCDEDCBA]
PalindromeCalculationQueueConsumer | Palindrome | [calculateName=Palindrome, value=12341234] : false
PalindromeCalculationQueueConsumer | Palindrome | [calculateName=Palindrome, value=ABCDEDCBA] : true
ArmstrongCalculationQueueProducer | Sending | [calculateName=Armstrong, value=8208]
PalindromeCalculationQueueProducer | Sending | [calculateName=Palindrome, value=4567887654]
ArmstrongCalculationQueueConsumer | Armstrong | [calculateName=Armstrong, value=8208] : false
PalindromeCalculationQueueConsumer | Palindrome | [calculateName=Palindrome, value=4567887654] : true
ArmstrongCalculationQueueProducer | Sending | [calculateName=Armstrong, value=2104]
ArmstrongCalculationQueueConsumer | Armstrong | [calculateName=Armstrong, value=2104] : false
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=15]
ArmstrongCalculationQueueProducer | Sending | [calculateName=Armstrong, value=4210818]
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=15] : 1307674368000
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=120]
ArmstrongCalculationQueueConsumer | Armstrong | [calculateName=Armstrong, value=4210818] : false
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=120] : 6689502913449127057588118054090372586752746333138029810295671352301633557244962989366874165271984981308157637893214090552534408589408121859898481114389650005964960521256960000000000000000000000000000
ArmstrongCalculationQueueProducer | Sending | [calculateName=Armstrong, value=345321]
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=543]
ArmstrongCalculationQueueConsumer | Armstrong | [calculateName=Armstrong, value=345321] : false
PalindromeCalculationQueueProducer | Sending | [calculateName=Palindrome, value=XYZZYX]
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=543] : 872891556790260456843586366934462570217404467584986862157951660218033476207283552939973350537069217537549478114229971312262991181125700413163606116971183695586311579361173915852193241844683585564114537080099157013082576149573523335369738442355599816710804068865355684599942047453968407440725166640261015140054933542126214585902983116193907129111747665196041855032660956095883694974186998924247749529335931094092429725640658310042199668214202637924631140057800119930627800378541111012271243379033822559451085315416955672912791395237932595257653475479021208979154389591618595449855779925751308303314870067464075830210893226767964127916083986194604372529850059441599156750579670067110415997949767860963439005485252514342425180564330056392659552281473502353159284918610493411467800558076001750687023376509841526414457026571388047691226042613278047076078395826335028468047405871941546558134196554638340479908186429178241794558954878506078646531853505428187507441610683354213870320835243780540993795316813622383436151335642255741817696689682127415809984693167490611336830354021351607247261703154384913620088006020923947745280000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
PalindromeCalculationQueueConsumer | Palindrome | [calculateName=Palindrome, value=XYZZYX] : true
FactorialCalculationQueueProducer | Sending | [calculateName=Factorial, value=7]
FactorialCalculationQueueConsumer | Factorial | [calculateName=Factorial, value=7] : 5040
=================================================================================================================
That's all!
I hope this article helps you to get a fair idea about ActiveMQ.
Source Code can be found here: JMS-ActiveMQ-Sample-Code
Liked the article? Please don't forget to press that like button. Happy coding!
Opinions expressed by DZone contributors are their own.
Comments