AWS SQS and Spring JMS Integration
For Java applications, the JMS API is an easy way to use the SQS messaging service in Amazon Web Services, especially with Spring Boot.
Join the DZone community and get the full member experience.
Join For FreeAmazon Web Services provides us with the SQS messaging service. The Java SDK for SQS is compatible with JMS.
Therefore instead of using SQS as a simple spring bean, we can integrate it with the JMS integration framework that Spring provides.
I will use Spring Boot and Gradle.
The gradle file:
group 'com.gkatzioura.sqstesting'
version '1.0-SNAPSHOT'
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.2.7.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'spring-boot'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile "org.springframework.boot:spring-boot-starter-thymeleaf"
compile "com.amazonaws:aws-java-sdk:1.10.55"
compile "org.springframework:spring-jms"
compile "com.amazonaws:amazon-sqs-java-messaging-lib:1.0.0"
compile 'org.slf4j:slf4j-api:1.6.6'
compile 'ch.qos.logback:logback-classic:1.0.13'
testCompile "junit:junit:4.11"
}
The application class:
package com.gkatzioura.sqstesting;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Created by gkatziourasemmanouil on 8/26/15.
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
And the application yml file:
queue:
endpoint: http://localhost:9324
name: sample-queue
I specify a localhost endpoint since I use ElasticMq.
The SQSConfig class is a configuration class in order to have an SQS client as a Spring bean available.
package com.gkatzioura.sqstesting.config;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by gkatziourasemmanouil on 25/02/16.
*/
@Configuration
public class SQSConfig {
@Value("${queue.endpoint}")
private String endpoint;
@Value("${queue.name}")
private String queueName;
@Bean
public AmazonSQSClient createSQSClient() {
AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("",""));
amazonSQSClient.setEndpoint(endpoint);
amazonSQSClient.createQueue(queueName);
return amazonSQSClient;
}
}
The SQSListener is a listener class implementing the JMS MessageListener interface.
package com.gkatzioura.sqstesting.listeners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by gkatziourasemmanouil on 25/02/16.
*/
@Component
public class SQSListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SQSListener.class);
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
LOGGER.info("Received message "+ textMessage.getText());
} catch (JMSException e) {
LOGGER.error("Error processing message ",e);
}
}
}
The JMSSQSConfig class contains configuration for the JmsTemplate and the DefaultMessageListenerContainer. Through the JMSSQSConfig class, we register the JMS MessageListeners.
package com.gkatzioura.sqstesting.config;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.*;
import com.gkatzioura.sqstesting.listeners.SQSListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
/**
* Created by gkatziourasemmanouil on 25/02/16.
*/
@Configuration
public class JMSSQSConfig {
@Value("${queue.endpoint}")
private String endpoint;
@Value("${queue.name}")
private String queueName;
@Autowired
private SQSListener sqsListener;
@Bean
public DefaultMessageListenerContainer jmsListenerContainer() {
SQSConnectionFactory sqsConnectionFactory = SQSConnectionFactory.builder()
.withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain())
.withEndpoint(endpoint)
.withAWSCredentialsProvider(awsCredentialsProvider)
.withNumberOfMessagesToPrefetch(10).build();
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(sqsConnectionFactory);
dmlc.setDestinationName(queueName);
dmlc.setMessageListener(sqsListener);
return dmlc;
}
@Bean
public JmsTemplate createJMSTemplate() {
SQSConnectionFactory sqsConnectionFactory = SQSConnectionFactory.builder()
.withAWSCredentialsProvider(awsCredentialsProvider)
.withEndpoint(endpoint)
.withNumberOfMessagesToPrefetch(10).build();
JmsTemplate jmsTemplate = new JmsTemplate(sqsConnectionFactory);
jmsTemplate.setDefaultDestinationName(queueName);
jmsTemplate.setDeliveryPersistent(false);
return jmsTemplate;
}
private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials("", "");
}
@Override
public void refresh() {
}
};
}
MessageService is a service that uses JMSTemplate in order to send messages to the queue
package com.gkatzioura.sqstesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* Created by gkatziourasemmanouil on 28/02/16.
*/
@Service
public class MessageService {
@Autowired
private JmsTemplate jmsTemplate;
@Value("${queue.name}")
private String queueName;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageService.class);
public void sendMessage(final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
Last but not least, a Controller is added. The controller sends the post request body to the queue as a message.
package com.gkatzioura.sqstesting;
import com.amazonaws.util.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
/**
* Created by gkatziourasemmanouil on 24/02/16.
*/
@Controller
@RequestMapping("/main")
public class MainController {
@Autowired
private MessageService messageService;
@RequestMapping(value = "/write",method = RequestMethod.POST)
public void write(HttpServletRequest servletRequest,HttpServletResponse servletResponse) throws IOException {
InputStream inputStream = servletRequest.getInputStream();
String message = IOUtils.toString(inputStream);
messageService.sendMessage(message);
}
}
You can download the source code here.
Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments