Mocking RabbitMQ for Java Integration Tests
How to use an embedded Java version of Apache Qpid to mock a RabbitMQ Broker inside integration tests.
Join the DZone community and get the full member experience.
Join For FreeIn this article I will show how I used an embedded Java version of Apache Qpid to mock a RabbitMQ Broker inside integration tests. It was tricky to make everything work and I couldn't find anything related on the web, so it's worthy to share.
The Challenge
My team's product uses Apache Flink to stream events (source) from a RabbitMQ queue and, after processing them, sink the data to an Elastic Search database.
This product was lacking tests. Our team decided that unit testing would be hard and verbose, as a lot of mocking would be needed just to remove all Flink infrastructure out of the way. So we decided to go straight with integration tests. The requirements for the integration tests was that (I) it should be fast enough and (II) easily run from any IDE.
RabbitMQ is built using Erlang, a programming language designed at Ericsson. For that reason it cannot run inside a JVM. This constraint forced myself to spend hours looking for a compatible Java mock replacement as running RabbitMQ inside some Docker image (or having it running manually inside a VM) would break the second test requirement (II).
I found the following candidates:
The first option was excluded straight as Spring AMQP is just a client to manage a AMQP Broker and we needed an embedded broker. So my first try was ActiveMQ Java broker.
After a few code setups later I tried to connect our product to the embedded broker and got an exception:
protocol mismatch: 1.0 / 0.9
The reason for that exception was that ActiveMQ implements version > 1.0 of the AMQP protocol and our product was using the legacy version 0.9, same as RabbitMQ. As ActiveMQ doesn't support any version bellow AMQP 1.0 I had to exclude that option as well.
The Qpid Challenge
At first sign, after reading Qpid documentation, I though I could make it work easily but them I noticed that the documentation explains a lot about the standalone installation and almost nothing about the embedded broker. Moreover, I found some posts on the internet (here and here) mentioning that depending on the version of Qpid the implementation of the Security Providers has changed radically making it hard to connect an AMQP client to the Qpid Broker.
After a lot of research and tweaks I was able to make a specific version of Qpid work. I share the solution bellow.
The Code
I was able to make it work only with Qpid version 0.28. Above this version the Security Providers were refactored and the communication with our legacy product code was impossible as we couldn't enable SSL from our side. Here is the maven dependency:
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
<version>0.28</version>
<scope>test</scope>
</dependency>
And the code to make the broker start up:
import java.io.*;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import com.google.common.io.Files;
public class EmbeddedAMQPBroker {
public static final int BROKER_PORT = findAvailableTcpPort();
private final Broker broker = new Broker();
public EmbeddedAMQPBroker() throws Exception {
final String configFileName = "qpid-config.json";
final String passwordFileName = "passwd.properties";
// prepare options
final BrokerOptions brokerOptions = new BrokerOptions();
brokerOptions.setConfigProperty("qpid.amqp_port", String.valueOf(BROKER_PORT));
brokerOptions.setConfigProperty("qpid.pass_file", findResourcePath(passwordFileName));
brokerOptions.setConfigProperty("qpid.work_dir", Files.createTempDir().getAbsolutePath());
brokerOptions.setInitialConfigurationLocation(findResourcePath(configFileName));
// start broker
broker.startup(brokerOptions);
}
private String findResourcePath(final String file) throws IOException {
// get the configuration file in the classpath
}
The qpid-config.json file:
{
"name" : "EmbeddedBroker",
"defaultVirtualHost" : "default",
"modelVersion" : "1.0",
"storeVersion" : 1,
"authenticationproviders" : [ {
"name" : "passwordFile",
"path" : "${qpid.pass_file}",
"type" : "PlainPasswordFile"
} ],
"ports" : [ {
"name" : "AMQP",
"port" : "${qpid.amqp_port}",
"authenticationProvider" : "passwordFile"
} ],
"virtualhosts" : [ {
"name" : "default",
"storePath" : "${qpid.work_dir}/derbystore/default",
"storeType" : "DERBY"
} ]
}
And passwd.properties:
guest:guest
Now, I used the Spring AMQP to connect to this mock broker and create/delete queues and exchanges. Here are the snippets:
Maven dependency:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.5.2.RELEASE</version>
<scope>test</scope>
</dependency>
Code to create and delete exchange and queue:
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
private void createExchange(final String identifier) {
final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
final RabbitAdmin admin = new RabbitAdmin(cf);
final Queue queue = new Queue("myQueue", false);
admin.declareQueue(queue);
final TopicExchange exchange = new TopicExchange("myExchange");
admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
cf.destroy();
}
private void deleteExchange() {
final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
final RabbitAdmin admin = new RabbitAdmin(cf);
admin.deleteExchange("myExchange");
cf.destroy();
}
Code to send messages:
public void sendMessage(final EiffelEvent event) throws Exception {
final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
final RabbitTemplate template = new RabbitTemplate(cf);
final String message = "hello world message!";
template.convertAndSend("myExchange", "#", message);
cf.destroy();
// waitForMessageBeConsumed();
}
Just put all this together in a test class starting the Broker at @BeforeClass, initializing the exchange/queue at @Before so your test can send the message and check the post-condition.
Hope this helps you!
Opinions expressed by DZone contributors are their own.
Comments