JMS-style selectors on Amazon SQS with Apache Camel
Join the DZone community and get the full member experience.
Join For FreeThis blog post demonstrates how easy it is to use Apache Camel and its new json-path component along with the camel-sqs component to produce and consume messages on Amazon SQS.
Amazon Web Services SQS is a message queuing “software as a service” (SaaS) in the cloud. To be able to use it, you need to sign up for AWS. It’s primary access mechanism is XML over HTTP through various AWS SDK clients provided by Amazon. Please check out the SQS documentation for more.
And as “luck” would have it, one of the users in the Apache Camel community created a component to be able to integrate with SQS. This makes it trivial to add a producer or consumer to an SQS queue and plugs in nicely with the Camel DSL.
SQS, however, is not a “one-size fits all” queueing service; you must be aware of your use case and make sure it fits (current requirements as well as somewhat into the future…). There are limitations that, if not studied and accounted for ahead of time, could come back to sink your project. An example of a viable alternative, and one that more closely fits the profile of a high performance and full featured message queue is Apache ActiveMQ.
For example, one limitation to keep in mind is that unlike traditional JMS consumers, you cannot create a subscription to a queue that filters messages based on some predicate (at least not using the AWS-SQS API — you’d have to build that into your solution).
Some other things to keep in mind when using SQS:
- The queue does not preserve FIFO messaging
That is, message order is not preserved. They can arrive out of order from when they were sent. Apache Camel can help with its resequencer pattern. Bilgin Ibryam, now a colleague of mine at Red Hat, has written a great blog post about how to restore message order using the resequencer pattern.
- Message size is limited to 256K
This is probably sufficient, but if your message sizes are variable, or contain more data that 256K, you will have to chunk them and send in smaller chunks.
- No selector or selective consumption
If you’re familiar with JMS, you know that you can specify consumers to use a “selector” or a predicate expression that is evaluated on the broker side to determine whether or not a specific message should be dispatched to a specific consumer. For example,
- Durability constraints
Some use cases call for the message broker to store messages until consumers return. SQS allows a limit of up to 14 days. This is most likely sufficient, but something to keep in mind.
- Binary payloads not allowed
SQS only allows text-based messages, e.g., XML, JSON, fixed format text, etc. Binary such as Avro, Protocol Buffers, or Thrift are not allowed.
For some of these limitations, you can work around them by building out the functionality yourself. I would always recommend taking a look at how an integration library like Apache Camel can help — which has out-of-the-box support for doing some of these things.
Doing JMS-style selectors
So the basic problem is we want to subscribe to a SQS queue, but we want to filter which messages we process. For those messages that we do not process, those should be left in the queue. To do this, we will make use of Apache Camel’s Filter EIP as well as the visibility timeouts available on the SQS queue.
By default, SQS will dispatch all messages in its queue when it’s queried. We cannot change this, and thus not avoid the message being dispatched to us — we’ll have to do the filtering on our side (this is different than how a full-featured broker like ActiveMQ does it, i.e., filtering is done on the broker side so the consumer doesn’t even see the message it does not want to see).
Once SQS dispatches a message, it does not remove it from the queue unless the consumer has acknowledged that it has it and is finished with it. The consumer does this by sending a DeleteMessage command. Until the DeleteMessage command is sent, the message is always in the queue, however visibility comes in to play here.
When a message is dispatched to a consumer, there is a period of time which it will not be visible to other consumers. So if you browsed the queue, you would not see it (it should appear in the stats as “in-flight”). However, there is a configurable period of time you can specify for how long this “visibility timeout” should be active. So if you set the visibility to a lower time period (default is 30 seconds), you can more quickly get messages re-dispatched to consumers that would be able to handle the message.
Take a look at the following Camel route which does just that:
@Override public void configure() throws Exception { // every two seconds, send a message to the "demo" queue in SQS from("timer:kickoff?period=5000") .setBody().method(this, "generateJsonString") .to("aws-sqs://demo?amazonSQSClient=#sqsClient&defaultVisibilityTimeout=2"); }
In the above Camel Route, we create a new message every 5 seconds and send it to an SQS queue named demo — note we set the defaultVisibilityTimeout to 2 seconds. This means that after a message gets dispatched to a consumer, SQS will wait about 2 seconds before considering it eligible to be dispatched to another consumer if it has not been deleted.
On the consumer side, we take advantage of a couple Apache Camel conveniences
Using JSON Path + Filter EIP
Camel has an excellent new component named JSON-Path. Claus Ibsen tweeted about it when he hacked it up. This allows you to do Content-Based Routing
on a JSON payload very easily by using XPath-style expressions to pick
out and evaluate attributes in a JSON encoded object. So in the
following example, we can test an attribute named ‘type’ to be equal to
‘LOGIN’ and use Camel’s Filter EIP to allow only those messages that match to go through and continue processing:
public class ConsumerRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("aws-sqs://demo?amazonSQSClient=#sqsClient&deleteIfFiltered=false") .setHeader("identity").jsonpath("$['type']") .filter(simple("${header.identity} == 'login'")) .log("We have a message! ${body}") .to("file:target/output?fileName=login-message-${date:now:MMDDyy-HHmmss}.json"); } }
To complete the functionality, we have to pay attention to a new configuration option added for the Camel-SQS component:
- deleteIfFiltered — Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter. If ‘false’ and exchange does not make it through a Camel filter upstream in the route, then don’t send DeleteMessage.
By default, Camel will send the “DeleteMessage” command to SQS after a route has completed successfully (without an exception). However, in this case, we are specifying to not send the DeleteMessage command if the message had been previously filtered by Camel.
This example demonstrates how easy it is to use Apache Camel and its new json-path component along with the camel-sqs component to produce and consume messages on Amazon SQS.
Please take a look at the source code on my github repo to play with the live code and try it out yourself.
Published at DZone with permission of Christian Posta, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments