Deploying Artemis Broker With SSL Enabled and Use AMQP
Learn how to deploy the Red Hat AMQ Broker on openshift 4.x. The external client can connect to produce/consume messages using AMQP.
Join the DZone community and get the full member experience.
Join For FreeRed Hat AMQ Broker
AMQ Broker is based on Apache ActiveMQ Artemis. It provides a message broker that is JMS-compliant. Apache ActiveMQ Artemis is an open-source project for an asynchronous messaging system. It is a high performance, embeddable, clustered, and supports multiple protocols.
The core ActiveMQ Artemis is JMS-agnostic and provides a non-JMS API, which is referred to as the core API. ActiveMQ Artemis also provides a JMS client API that uses a facade layer to implement the JMS semantics on top of the core API. Essentially, JMS interactions are translated into core API operations on the client-side using the JMS client API. From there, all operations are sent using the core client API and Apache ActiveMQ Artemis wire format. The server itself only uses the core API. For more details on the core API and its concepts, refer to the ActiveMQ Artemis documentation.
Red Hat Open Shift
Red Hat Open Shift offers a consistent hybrid-cloud foundation for building and scaling containerized applications. Open Shift provides an enterprise-grade, container-based platform with no vendor lock-in. Red Hat was one of the first companies to work with Google on Kubernetes, even before launch, and has become the second leading contributor to the Kubernetes upstream project. Open Shift also provides a common development platform no matter what infrastructure we use to host the application.
AMQP
The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.
In this article we will deploy the Red Hat AMQ Broker 7.7 on openshift 4.3 with SSL security enabled and produce consume messages from the external clients using AMQP protocol.
Prerequisites
For this demonstration, you will need the following technologies set up in your development environment:
- An Open Shift 4.3+ environment with Cluster Admin access
- Open shift CLI (
oc
) - Apache Maven 3.6.3+
- JDK 8+ Installed
There are two ways to deploy AMQ Broker on Open Shift Container Platform:
The AMQ Broker Operator is the recommended way to create broker deployments on Open Shift Container Platform. we will be using AMQ Broker Operator to deploy the AMQ Broker
Deploy AMQ Broker on Open Shift
- In your web browser, navigate to the AMQ Broker Software Downloads page.
- In the Version drop-down box, ensure that the value is set to the latest AMQ Broker version,
7.7.0
. Next to AMQ Broker 7.7 Operator Installation Files, click Download.
Download of the
amq-broker-operator-7.7.0-ocp-install-examples.zip
compressed archive automatically begins.When the download has completed, move the archive to your chosen installation directory
Log in to the Open Shift Container Platform as a cluster administrator.
Install AMQBrokerOperator
1. Create a new project
oc new-project new-message-project
2. Specify a service account to use with the Operator
3. Create a service account in your project.
x
$ oc create -f deploy/service_account.yaml
4. Create a role in your project. This file specifies the resources that the Operator can use and modify.
xxxxxxxxxx
oc create -f deploy/role.yaml
5. Create the role binding in your project. The role binding binds the previously-created service account to the Operator role, based on the names you specified
xxxxxxxxxx
oc create -f deploy/role_binding.yaml
6. Install the latest CRDs in your Open Shift cluster before deploying and starting the Operator
Deploy the main broker CRD, address CRD, and scale down controller CRD.
xxxxxxxxxx
oc create -f deploy/crds/broker_activemqartemis_crd.yaml
oc create -f deploy/crds/broker_activemqartemisaddress_crd.yaml
oc create -f deploy/crds/broker_activemqartemisscaledown_crd.yaml
7. Create Imagepullsecret and associate with the account used for authentication in the Red Hat Container Registry with the default
, deployer
, and builder
service accounts for your Open Shift project.
xxxxxxxxxx
oc create secret docker-registry imagestreamsecret --docker-server=registry.redhat.io --docker-username= --docker-password= --docker-email=
oc secrets link --for=pull default imagestreamsecret
oc secrets link --for=pull deployer imagestreamsecret
oc secrets link --for=pull builder imagestreamsecret
[kkakarla /]$ oc get secrets | grep imagestreamsecret
imagestreamsecret kubernetes.io/dockerconfigjson 1 22h
8. Deploy the Operator
xxxxxxxxxx
oc create -f deploy/operator.yaml
[kkakarla /]$ oc get pods | grep operator
amq-broker-operator-85598678bf-l9c2g 1/1 Running 0 22h
9. Now Deploy the AMQ Artemis SSL enabled broker
Add the protocols
and port
parameters. Set values to specify the messaging protocols to be used by the acceptor and the port on each broker Pod to expose for those protocols
The configured acceptor exposes port 5672 to AMQP clients, Core protocol to the core, Open Wire to openwire, MQTT to MQTT and STOMP to stomp
For each broker Pod in your deployment, the Operator also creates a default acceptor that uses port 61616. This default acceptor is required for broker clustering and has the Core protocol enabled.
By default, the AMQ Broker management console uses port 8161 on the broker Pod. Each broker Pod in your deployment has a dedicated service that provides access to the console
To specify the number of concurrent client connections that the acceptor allows, add the connections Allowed
parameter and set a value
By default, an acceptor is exposed only to clients in the same Open Shift cluster as the broker deployment. To also expose the acceptor to clients outside Open Shift, add the expose
parameter and set the value to true
.
Also, to enable secure connections to the acceptor from clients outside Open Shift, add the sslEnabled
parameter and set the value to true
Configuring One-Way TLS
The procedure in this section shows how to configure one-way Transport Layer Security (TLS) to secure a broker-client connection.
In one-way TLS, only the broker presents a certificate. This certificate is used by the client to authenticate the broker
1. Generate a self-signed certificate for the broker key store.
xxxxxxxxxx
keytool -genkey -alias broker -keyalg RSA -keystore /home/kkakarla/development/amq7/amq7-on-openshift/broker.ks
2. Export the certificate from the broker key store, so that it can be shared with clients. Export the certificate in the Base64-encoded .pem
format
xxxxxxxxxx
keytool -export -alias broker -keystore /home/kkakarla/development/amq7/amq7-on-openshift/broker.ks -file /home/kkakarla/development/amq7/amq7-on-openshift/broker_cert.pem
3. On the client, create a client trust store that imports the broker certificate
xxxxxxxxxx
keytool -import -alias broker -keystore /home/kkakarla/development/amq7/amq7-on-openshift/client.ts -file /home/kkakarla/development/amq7/amq7-on-openshift/broker_cert.pem
4. Create a secret to store the TLS credentials
xxxxxxxxxx
oc create secret generic amq7brokersecret \
--from-file=broker.ks=/home/kkakarla/development/amq7/amq7-on-openshift/broker.ks \
--from-file=client.ts=/home/kkakarla/development/amq7/amq7-on-openshift/broker.ks \
--from-literal=keyStorePassword=artemis7 \
--from-literal=trustStorePassword=artemis7
[kkakarla /]$ oc get secrets | grep amq7brokersecret
amq7brokersecret Opaque 4 23h
5. Add the secret to the service account that you created when installing the Operator.
xxxxxxxxxx
oc secrets add sa/amq-broker-operator secret/amq7brokersecret
6. Specify the secret name in the sslSecret
parameter of your secured acceptor or connector
7. Now deploy the broker
xxxxxxxxxx
apiVersion: broker.amq.io/v2alpha2
kind: ActiveMQArtemis
metadata:
name: ex-aao
spec:
deploymentPlan:
size: 1
image: registry.redhat.io/amq7/amq-broker:7.7
acceptors:
- name: amqp
protocols: amqp,openwire
port: 5672
sslEnabled: true
sslSecret: amq7brokersecret
expose: true
connectionsAllowed: 25
8. Now check all the project resources
xxxxxxxxxx
[kkakarla@kkakarla /]$ oc get all
NAME READY STATUS RESTARTS AGE
pod/amq-broker-operator-85598678bf-l9c2g 1/1 Running 0 23h
pod/ex-aao-ss-0 1/1 Running 0 23h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/amq-broker-operator ClusterIP 0.0.0.0 <none> 8383/TCP 23h
service/ex-aao-amqp-0-svc ClusterIP 0.0.0.0 <none> 5672/TCP 23h
service/ex-aao-hdls-svc ClusterIP None <none> 8161/TCP,61616/TCP 23h
service/ex-aao-ping-svc ClusterIP None <none> 8888/TCP 23h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/amq-broker-operator 1/1 1 1 23h
NAME DESIRED CURRENT READY AGE
replicaset.apps/amq-broker-operator-85598678bf 1 1 1 23h
NAME READY AGE
statefulset.apps/ex-aao-ss 1/1 23h
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
route.route.openshift.io/ex-aao-amqp-0-svc-rte ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com ex-aao-amqp-0-svc amqp-0 passthrough/None None
Connecting to the Broker From External Clients
When you expose an acceptor to external clients (that is, by setting the value of the expose
parameter to true
), a dedicated Service and Route are automatically created for each broker Pod in the deployment
An external client can connect to the broker by specifying the full hostname of the Route created for the broker Pod
By default, the Open Shift router listens to port 80 for non-secured (that is, non-SSL) traffic and port 443 for secured (that is, SSL-encrypted) traffic. For an HTTP connection, the router automatically directs traffic to port 443 if you specify a secure connection URL, or to port 80 if you specify a non-secure connection URL
Clients must explicitly specify the port number (for example, port 443) as part of the connection URL.
For one-way TLS, the client must specify the path to its trust store and the corresponding password, as part of the connection URL.
To Produce Messages using amqps
xxxxxxxxxx
public void amqpTest() throws Exception{
JmsConnectionFactory activeMQConnectionFactory = new JmsConnectionFactory("user-name","password","amqps://ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com:443?" +
"transport.trustStoreLocation=/home/kkakarla/development/amq7/amq7-on-openshift/client.ts&transport.keyStoreLocation=/home/kkakarla/development/amq7/amq7-on-openshift/broker.ks" +
"&transport.trustStorePassword=artemis7&transport.keyStorePassword=artemis7&transport.verifyHost=false");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(session.createQueue("test_q"));
Message message = session.createTextMessage("this is amqp two way ssl testing");
messageProducer.send(message);
connection.close();
}
For simplicity. I defined the above class as a bean and called from the camel route.
xxxxxxxxxx
<bean id="amqpTest" class="com.mycompany.camel.AmqpsslExample"></div>
<camelContext id="_camelContext1"
xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="timer:foo?period=5000&repeatCount=1"></from>
<setBody>
<method ref="amqpTest" method="amqpTest"></method>
</setBody>
<log message="The message sent"></log>
<to uri="mock:result"></to>
</route>
To Consume Messages using amqps.
xxxxxxxxxx
public void amqpTestConsumer() throws Exception{
JmsConnectionFactory activeMQConnectionFactory = new JmsConnectionFactory("user-name","password","amqps://ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com:443?" +
"transport.trustStoreLocation=/home/kkakarla/development/amq7/amq7-on-openshift/client.ts&transport.keyStoreLocation=//home/kkakarla/development/amq7/amq7-on-openshift/broker.ks" +
"&transport.trustStorePassword=artemis7&transport.keyStorePassword=artemis7&transport.verifyHost=false");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Step 5. create a moving receiver, this means the message will be removed from the queue
MessageConsumer consumer = session.createConsumer(session.createQueue("test_q"));
// Step 7. receive the simple message
TextMessage m = (TextMessage) consumer.receive(5000);
System.out.println("message = " + m.getText());
connection.close();
}
camel consumer route
x
<route >
<from uri="timer:foo?period=5000&repeatCount=1"/>
<setBody>
<method ref="amqpTest" method="amqpTestConsumer()"/>
</setBody>
<to uri="mock:result"/>
</route>
Now Run the Test case to connect the AMQ Broker deployed on openshift and produce/consume messages
xxxxxxxxxx
[kkakarla camel-artemis-openshiftexample]$ mvn camel:run
[INFO] Scanning for projects...
[INFO]
[INFO] ----------------< com.mycompany:camel-artemis-example1 >----------------
[INFO] Building Fuse CBR Quickstart - Java 1.0.0-SNAPSHOT
[INFO] -------------------------------[ bundle ]-------------------------------
[INFO]
---------------------------------------------------------------------------
----------------------------------------------------------------------------
[pache.camel.spring.Main.main()] SpringCamelContext INFO Route: route1 started and consuming from: timer://foo?period=5000&repeatCount=1
[pache.camel.spring.Main.main()] SpringCamelContext INFO Route: route2 started and consuming from: timer://foo?period=5000&repeatCount=1
[pache.camel.spring.Main.main()] SpringCamelContext INFO Total 2 routes, of which 2 are started
[pache.camel.spring.Main.main()] SpringCamelContext INFO Apache Camel 2.21.0.fuse-000077-redhat-1 (CamelContext: _camelContext1) started in 0.226 seconds
[pache.camel.spring.Main.main()] DefaultLifecycleProcessor INFO Starting beans in phase 2147483646
[b.upshift.rdu2.redhat.com:443]] SaslMechanismFinder INFO Best match for SASL auth was: SASL-PLAIN
[b.upshift.rdu2.redhat.com:443]] JmsConnection INFO Connection ID:ae130afa-ffa9-4ba6-884f-2d40840d8421:1 connected to remote Broker: amqps://ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com:443?transport.trustStoreLocation=%2Fhome%2Fkkakarla%2Fdevelopment%2Famq7%2Famq7-on-openshift%2Fclient.ts&transport.keyStoreLocation=%2Fhome%2Fkkakarla%2Fdevelopment%2Famq7%2Famq7-on-openshift%2Fbroker.ks&transport.trustStorePassword=artemis7&transport.keyStorePassword=artemis7&transport.verifyHost=false
[text1) thread #2 - timer://foo] route1 INFO The message sent
[b.upshift.rdu2.redhat.com:443]] SaslMechanismFinder INFO Best match for SASL auth was: SASL-PLAIN
[b.upshift.rdu2.redhat.com:443]] JmsConnection INFO Connection ID:c9b34adb-4355-49fb-a1e8-ef95a11e698e:1 connected to remote Broker: amqps://ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com:443?transport.trustStoreLocation=%2Fhome%2Fkkakarla%2Fdevelopment%2Famq7%2Famq7-on-openshift%2Fclient.ts&transport.keyStoreLocation=%2F%2Fhome%2Fkkakarla%2Fdevelopment%2Famq7%2Famq7-on-openshift%2Fbroker.ks&transport.trustStorePassword=artemis7&transport.keyStorePassword=artemis7&transport.verifyHost=false
message = this is amqp two way ssl testing
Similarly, we can connect using the open-wire protocol.
xxxxxxxxxx
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQSslConnectionFactory">
<property name="brokerURL" value="ssl://ex-aao-amqp-0-svc-rte-new-message-project.apps.xxx.redhat.com:443" />
<property name="userName" value="7nRMRFDZ" />
<property name="password" value="bFxUBMQJ" />
<property name="trustStore" value="/home/kkakarla/development/amq7/amq7-on-openshift/client.ts" />
<property name="trustStorePassword" value="artemis7" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<!-- maxConnections : Sets the maximum number of pooled Connection. Default = 1) -->
<property name="maxConnections" value="10" />
<!-- maximumActiveSessionPerConnection - The maximum number of active session per connection in the pool. -->
<property name="maximumActiveSessionPerConnection" value="20" />
<!-- blockIfSessionPoolIsFull : Controls behavior of session pool. Blocks call to Connection.getSession()
if the session pool is full. Default = true -->
<property name="blockIfSessionPoolIsFull" value="true" />
<!-- createConnectionOnStartup - true to create a connection on startup. Used to warm-up the pool on startup. -->
<property name="createConnectionOnStartup" value="true" />
<!-- idleTimeout : The maximum time a pooled Connection can sit unused before it is eligible for removal. Default=30sec -->
<property name="idleTimeout" value="50" />
<!-- connectionFactory : Sets the ConnectionFactory used to create new pooled Connections. -->
<property name="connectionFactory" ref="activeMQConnectionFactory" />
</bean>
<bean id="jmsConfiguration" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<!-- concurrentConsumers : Maximum no.of concurrent invokers -->
<property name="concurrentConsumers" value="5"/>
<!-- maxConcurrentConsumers : Allows dynamic scaling for no.of concurrent invokers as well as dynamic shrinking back to the standard no.of consumers once the load decreases.-->
<property name="maxConcurrentConsumers" value="10"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfiguration"/>
<property name="deliveryPersistent" value="false" />
</bean>
<camelContext id="_camelContext1"
xmlns="http://camel.apache.org/schema/spring">
<route id="test1">
<from id="testfrom1" uri="file:src/main/data?noop=true"/>
<log id="test-1og1" message=" Transferring"/>
<to id="test-to1" uri="direct:ExampleQueue"/>
</route>
<route>
<from uri="direct:ExampleQueue"/>
<log message=" Transferring to queue"/>
<to uri="activemq:queue:SCIENCEQUEUE"/>
</route>
<route >
<from uri="activemq:queue:SCIENCEQUEUE"/>
<log message=" consuming from queue"/>
<to uri="file:src/out"/>
</route>
</camelContext>
I hope it will help those who want to deploy the Red Hat AMQ Broker on openshift 4.x and the external client can connect to produce/consume messages using AMQP and open-wire protocols
Opinions expressed by DZone contributors are their own.
Comments