Batch Processing With Spring Batch and AMQP: Easier Than You Think
Implementing batch processing is way easier than you might think.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
Batch processing is a common task for many enterprises. So, a perfectly natural tool to reach for when creating batch processors in the Java world is Spring Batch. With its myriad connectors for both input and output, it makes the development process a lot shorter than it, otherwise, might be.
But even if you develop in a Java environment, there is no guarantee that your inputs will be coming from another Java-based application, or even a database. In fact, if you've worked in a large enough enterprise or ecosystem, you will have doubtlessly worked with message buses or queues, so this would seem to be a very likely source of inputs meant for the batch processing.
In an attempt to be as flexible and extensible as possible, when it comes to working with messaging queues, Spring Batch provides a couple very useful readers; namely, JmsItemReader
and AmqpItemReader
.
Let's take a look at an example of how to use the latter in an attempt to be more "open" (and less Java-specific).
A Word on AMQP
If you are unfamiliar with AMQP, it suffices to say that it's an open-source messaging protocol meant to make interoperability and communications between buses and systems more resilient and consistent and is an excellent choice for those who don't wish to be locked into a specific technology or stack.
In the case of batch processing, this is especially useful — even if you are sending messages from another Java application, you can avoid the headaches that come from sending POJOs over the wire, requiring a consistent model between multiple applications. That is, sending and receiving strings is much more flexible. Perhaps, in another post, we will take a look at how to (easily) serialize and/or deserialize between objects and strings using Spring AMQP.
For now, you can read more about AMQP here.
Caveats
This article is not meant to be a full introduction to Spring Batch; rather, it is meant to illustrate what is possible with batch processing and a messaging queue with relatively little effort.
That said, while getting Spring Batch off the ground is pretty easy, there can be a couple gotchas as far as setting up the dependencies, including the location of batch metadata (you can point Spring Batch at, say, an instance of MySQL), as well as ensuring that the metadata tables are properly initialized. These items fall outside the scope of this post, but I promise that if you do run into such issues, they are straightforward and easy to address.
(Hint: You may have to run a couple SQL scripts that come with Spring Batch to initialize the metadata tables if Spring Batch has difficulty doing it on its own.)
Also, note that this project uses the default settings that come with Spring Batch and Spring AMQP. These can, of course, be changed by putting the appropriate settings into a properties file.
Prerequisites
This article assumes that you have a version of RabbitMQ installed somewhere that is accessible to this project, be it on your local machine or another remote location that you can use. Getting started with it is actually pretty easy.
Dependencies
We will be using Maven to manage dependencies in this example.
Using Spring Boot allows you to get off the ground in a hurry, so, in your POM, be sure to include among your dependencies the following artifacts:
<dependencies>
...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
...
</dependencies>
Configuration
Since this is Spring Boot, you will need an entry point for the application. So, be sure to include a class such as the one below in src/main/java
:
@SpringBootApplication
public class MyBatchApplication {
public static void main(String[] args) {
SpringApplication.run(MyBatchApplication.class, args);
}
}
Now, we need to create the configurations for the actual batch processing that is to take place. You can read more about the various options that Spring Batch offers for batch processing (e.g. using tasklets instead of steps), but that is outside the scope of this article.
For this article, we are going to make use of "steps" to process data. In a nutshell, each "step" consists of a reader, a processor, and a writer. We are going to focus on how to create and configure a reader that looks to an AMQP-based message queue for input to process.
First, we need to set up an overall configuration for the project, so, using JavaConfig, let's do that:
@Configuration
@EnableBatchProcessing
public class MyBatchAppConfig {
}
Notice the @EnableBatchProcessing
annotation. This is mandatory in order to let Spring Boot know to take care of a lot of the heavy lifting when it comes to setting up Spring Batch.
Next, let's set up a message queue using Spring AMQP, which will also provide the template necessary to help interact with the queue. Place this code in the same configuration file as above; I've also included some of the noteworthy import
statements. (Note again that I'm using RabbitMQ as the implementation of choice for this project, but you can use any AMQP-supported messaging queue solution, though preferably one that has an available Spring template to make your life a bit easier.)
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
...
public final static String queueName = "my-queue";
@Autowired
private ConnectionFactory rabbitConnectionFactory;
// this will create a new queue if it doesn't exist; otherwise, it'll use the existing one of the same name
// ...the second argument means to make the queue 'durable'
@Bean
public Queue myQueue() {
return new Queue(queueName, true);
}
// this is necessary for operations with Spring AMQP
@Bean
public RabbitTemplate getMyQueueTemplate() {
RabbitTemplate template = new RabbitTemplate(this.rabbitConnectionFactory);
template.setQueue(queueName);
return template;
}
...
Great! We're ticking things off the list!
A quick note (as seen in the comments above) is that the RabbitMQ queue specified is either created or, if one of the same name already exists, is pointed at that existing queue. The second argument to the constructor denotes that the queue is to be made 'durable'.
Sweet, now, we're down to the actual meat-and-potatoes of what we're trying to do: Configuring a batch job's "step;" that is, what happens for each piece of data that's consumed, assuming we're using chunk-based processing and not a tasklet (see here for the doman language Spring Batch uses)? Let's add the following code to the project's config (we'll walk through it after):
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.amqp.AmqpItemReader;
...
@Autowired
private StepBuilderFactory stepBuilderFactory;
...
@Bean
public Step getMyJobStep() {
return this.stepBuilderFactory.get("myJobStep")
.<String, DomainObject>chunk(1)
.reader(this.getMyReader())
.processor(this.getMyProcessor())
.writer(this.getMyWriter())
.build();
}
@Bean
public ItemReader<String> getMyReader() {
return new AmqpItemReader<String>(this.getMyQueueTemplate());
}
@Bean
public ItemProcessor<String, DomainObject> getMyProcessor() {
return new MessageProcessor();
}
@Bean
public ItemWriter<DomainObjecvt> getMyWriter() {
return new MessageWriter();
}
...
So, we can see that we are using a builder pattern to create a step in Spring Batch. Breaking it down:
- Each "step" performs the following operations:
- One or more items, called a "chunk," are read in from a source. The size of the chunk is set below.
- The output of the reader above is sent to a processor.
- The output of the processor above is sent to the writer.
- The above description is simplified some, but this is a very common use case.
.get("myJobStep")
assigns the identifier "myJobStep" to the step, which can be used for later reference or lookup.- .<String, DomainObject>chunk(1) is important as it tells the step that it's expecting a
String
as input (i.e. the message from the queue) and will send an object of typeDomainObject
to the processor (and writer) for handling. - The argument
1
tochunk(1)
above represents how many items are to be pulled from the source for a given iteration. These chunks are pulled sequentially (unless the step is made asynchronous) until the source is exhausted. - The real magic here is the
AmqpItemReader
, which tells us that it's expecting a String to represent each message on the queue and takes as an argument the queue template we created earlier. TheAmqpItemReader
is a class that comes with Spring Batch out of the box — no other work is needed to get it to work! That's the magic of Spring Batch! DomainObject
is your implementation of a POJO that will represent each message coming off of the queue; that is, you will need to map the String message (it could be XML or JSON or any string, really) to this POJO. A good place to do this is in the item processor.-
MessageProcessor
andMessageWriter
are your implementations ofItemProcessor
andItemWriter
, respectively. Spring Batch does come with some pre-made implementations out of the box. Feel free to see what is on offer!
That's It?
Believe it or not, yes, that's it! Yes, there are other setup details you may need to deal with, but, at its core, this is all that's required in order to have your Spring Batch project accept messages from an AMQP source. You don't need to waste time implementing something custom for a relatively-common use case.
Conclusion
In this post, we took an introductory look at how to have your Spring Batch project accept input from an AMQP-based message queue with very little setup or custom coding necessary.
If you deal with batch processing on a regular basis, you will want to take a look at what else Spring Batch can do for you. The initial learning curve is not all that bad, especially once you get your head around the domain language. As with most frameworks, there are some gotchas to watch out for, but this is the most flexible batch processing framework I've come across in many years.
Thanks for reading, and I hope this post helps some of you who are in a similar position as I was!
Opinions expressed by DZone contributors are their own.
Comments