Spring Batch Goodies With MongoDB
Explore a tutorial that explains what needs to be done in order to get spring batch goodies along with MongoDB.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I am going to explain what needs to be done in order to get spring batch goodies along with MongoDB. I assume that the reader has basic knowledge about spring boot, spring batch, MongoDB, and Java 8.
Let's start!
Motivation
In my previous article I showed how to implement an application that replicates one database into local MongoDB instance using spring batch. Now, I want to show what interfaces needs to be implemented in order to have all the details regarding batch itself. Spring batch gives all the information out of the box, but I could not fing the default mechanism to store them into the MongoDB database. After some research, I found this repository that I used as my starting point.
Preconditions
System requirements:
- Java 8
- Maven 3.5.2
- Spring Boot 1.5.10
- MongoDB 3.4.
Here are the dependencies defined in pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
This is a Spring Boot application with spring batch and MongoDB starters.
Implementation and Explanation
Let's see the batch configuration in class MainBatchConfigurer:
@Configuration
public class MainBatchConfigurer implements BatchConfigurer {
@Autowired
private ExecutionContextDao mongoExecutionContextDao;
@Autowired
private JobExecutionDao mongoJobExecutionDao;
@Autowired
private JobInstanceDao mongoJobInstanceDao;
@Autowired
private StepExecutionDao mongoStepExecutionDao;
@Override
public JobRepository getJobRepository() {
return new SimpleJobRepository(
mongoJobInstanceDao,
mongoJobExecutionDao,
mongoStepExecutionDao,
mongoExecutionContextDao
);
}
@Override
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Override
public SimpleJobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return new SimpleJobExplorer(
mongoJobInstanceDao,
mongoJobExecutionDao,
mongoStepExecutionDao,
mongoExecutionContextDao
);
}
}
The configuration was done based on the documentation that I found on the Spring page. The MainBatchConfigurer is spring configuration class that implements interface BatchConfigurer from spring package. Spring documentation says:
The core interface for this configuration is theBatchConfigurer
. The default implementation provides the beans ... and requires aDataSource
as a bean within the context to be provided. This data source will be used by theJobRepository
.
BatchConfigurer requires following definitions:
- job repository
- transaction manager
- job launcher
- job explorer
These definitions are mandatory, but we do not have to think about how to implement them. Spring contains default implementations for each of them. What is important here is that the job repository and job explorer need implementation of interfaces ExecutionContextDao, JobExecutionDao, JobInstanceDao, and StepExecutionDao. Also, when SimpleJobLauncher is created, it is very important to call method afterPropertiesSet after the job launcher is fully created. SimpleJobLauncher requires a fully defined job repository. As spring documentation says:
"...theJobRepository
is used for basic CRUD operations of the various persisted domain objects within Spring Batch, such asJobExecution
andStepExecution
. It is required by many of the major framework features, such as theJobLauncher
,Job
, andStep
."
In this particular case, I do not need any kind of transaction handling, so I'm using ResourcelessTransactionManager. If one needs complex transaction handling, then proper implementation should be chosen. When it comes to JobExplorer, the documentation says:
"...JobExplorer
is a read-only version of theJobRepository
..."
Now, lets see the interface that has to be used in order to store batch details into the database. First, I want to show ExecutionContextDao. It contains the following methods:
ExecutionContext getExecutionContext(JobExecution jobExecution);
ExecutionContext getExecutionContext(StepExecution stepExecution);
void saveExecutionContext(final JobExecution jobExecution);
void saveExecutionContext(final StepExecution stepExecution);
void saveExecutionContexts(final Collection<StepExecution> stepExecutions);
void updateExecutionContext(final JobExecution jobExecution);
void updateExecutionContext(final StepExecution stepExecution);
No magic here. The interface clearly shows what has to be done here. So, let's take a look at the implementation.
@Repository
public class MongoExecutionContextDao extends AbstractMongoDao implements ExecutionContextDao {
@PostConstruct
public void init() {
super.init();
getCollection().createIndex(
BasicDBObjectBuilder.start()
.add(STEP_EXECUTION_ID_KEY, 1)
.add(JOB_EXECUTION_ID_KEY, 1)
.get()
);
}
@Override
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
return getExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId());
}
@Override
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
return getExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId());
}
@Override
public void saveExecutionContext(JobExecution jobExecution) {
saveOrUpdateExecutionContext(
JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext()
);
}
@Override
public void saveExecutionContext(StepExecution stepExecution) {
saveOrUpdateExecutionContext(
STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext()
);
}
@Override
public void updateExecutionContext(JobExecution jobExecution) {
saveOrUpdateExecutionContext(
JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext()
);
}
@Override
public void updateExecutionContext(StepExecution stepExecution) {
saveOrUpdateExecutionContext(
STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext()
);
}
private void saveOrUpdateExecutionContext(String executionIdKey,
Long executionId,
ExecutionContext executionContext) {
Assert.notNull(executionId, "ExecutionId must not be null.");
Assert.notNull(executionContext, "The ExecutionContext must not be null.");
DBObject dbObject = new BasicDBObject(executionIdKey, executionId);
for (Map.Entry<String, Object> entry : executionContext.entrySet()) {
Object value = entry.getValue();
String key = entry.getKey();
dbObject.put(key.replaceAll(DOT_STRING, DOT_ESCAPE_STRING), value);
if (value instanceof BigDecimal || value instanceof BigInteger) {
dbObject.put(key + TYPE_SUFFIX, value.getClass().getName());
}
}
getCollection().update(
new BasicDBObject(executionIdKey, executionId), dbObject, true, false
);
}
@SuppressWarnings({"unchecked"})
private ExecutionContext getExecutionContext(String executionIdKey, Long executionId) {
Assert.notNull(executionId, "ExecutionId must not be null.");
DBObject result = getCollection().findOne(
new BasicDBObject(executionIdKey, executionId)
);
ExecutionContext executionContext = new ExecutionContext();
if (result != null) {
result.removeField(executionIdKey);
removeSystemFields(result);
for (String key : result.keySet()) {
Object value = result.get(key);
String type = (String) result.get(key + TYPE_SUFFIX);
if (type != null && Number.class.isAssignableFrom(value.getClass())) {
try {
value = NumberUtils.convertNumberToTargetClass(
(Number) value,
(Class<? extends Number>) Class.forName(type)
);
} catch (Exception e) {
logger.warn("Failed to convert {} to {}", key, type);
}
}
//Mongo db does not allow key name with "." character.
executionContext.put(
key.replaceAll(DOT_ESCAPE_STRING, DOT_STRING), value
);
}
}
return executionContext;
}
@Override
protected DBCollection getCollection() {
return mongoTemplate.getCollection(ExecutionContext.class.getSimpleName());
}
@Override
public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
for (StepExecution stepExecution : stepExecutions) {
saveExecutionContext(stepExecution);
saveExecutionContext(stepExecution.getJobExecution());
}
}
}
What needs to be done here is to implement all the methods that save ExecutionContext based on JobExecution and StepExecution. This has to be done manually. To save ExecutionContext as a MongoDB document private method, saveOrUpdateExecutionContext was implemented. What it does is extracts parameters from ExecutionContext, puts them into the MongoDB document, and stores that document into MongoDB. The other method worth mentioning here is private method getExecutionContext. This method does exactly the opposite of what saveOrUpdateExecutionContext does. It gets the raw document from the collection named "ExecutionContext" (check method getCollection) and converts it into object ExecutionContext.
Now, let us look closer into the interface JobInstanceDao. The interface contains the following methods:
JobInstance createJobInstance(String jobName, JobParameters jobParameters);
@Nullable
JobInstance getJobInstance(String jobName, JobParameters jobParameters);
@Nullable
JobInstance getJobInstance(@Nullable Long instanceId);
@Nullable
JobInstance getJobInstance(JobExecution jobExecution);
List<JobInstance> getJobInstances(String jobName, int start, int count);
List<String> getJobNames();
List<JobInstance> findJobInstancesByName(String jobName, int start, int count);
int getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException;
The above methods manage JobInstance on a database level. Below is the implementation:
@Repository
public class MongoJobInstanceDao extends AbstractMongoDao implements JobInstanceDao {
@PostConstruct
@Override
public void init() {
super.init();
getCollection().createIndex(jobInstanceIdObj(1L));
}
@Override
public JobInstance createJobInstance(String jobName, final JobParameters jobParameters) {
Assert.notNull(jobName, "Job name must not be null.");
Assert.notNull(jobParameters, "JobParameters must not be null.");
Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist");
Long jobId = getNextId(JobInstance.class.getSimpleName(), mongoTemplate);
JobInstance jobInstance = new JobInstance(jobId, jobName);
jobInstance.incrementVersion();
Map<String, JobParameter> jobParams = jobParameters.getParameters();
Map<String, Object> paramMap = new HashMap<>(jobParams.size());
for (Map.Entry<String, JobParameter> entry : jobParams.entrySet()) {
paramMap.put(
entry.getKey().replaceAll(DOT_STRING, DOT_ESCAPE_STRING),
entry.getValue().getValue()
);
}
getCollection().save(
BasicDBObjectBuilder.start()
.add(JOB_INSTANCE_ID_KEY, jobId)
.add(JOB_NAME_KEY, jobName)
.add(JOB_KEY_KEY, createJobKey(jobParameters))
.add(VERSION_KEY, jobInstance.getVersion())
.add(JOB_PARAMETERS_KEY, new BasicDBObject(paramMap))
.get()
);
return jobInstance;
}
@Override
public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
Assert.notNull(jobName, "Job name must not be null.");
Assert.notNull(jobParameters, "JobParameters must not be null.");
String jobKey = createJobKey(jobParameters);
return mapJobInstance(getCollection().findOne(
BasicDBObjectBuilder.start()
.add(JOB_NAME_KEY, jobName)
.add(JOB_KEY_KEY, jobKey).get()), jobParameters);
}
@Override
public JobInstance getJobInstance(Long instanceId) {
return mapJobInstance(getCollection().findOne(jobInstanceIdObj(instanceId)));
}
@Override
public JobInstance getJobInstance(JobExecution jobExecution) {
DBObject instanceId = mongoTemplate.getCollection(JobExecution.class.getSimpleName())
.findOne(jobExecutionIdObj(jobExecution.getId()), jobInstanceIdObj(1L));
removeSystemFields(instanceId);
return mapJobInstance(getCollection().findOne(instanceId));
}
@Override
public List<JobInstance> getJobInstances(String jobName, int start, int count) {
return mapJobInstances(
getCollection()
.find(new BasicDBObject(JOB_NAME_KEY, jobName))
.sort(jobInstanceIdObj(-1L))
.skip(start)
.limit(count)
);
}
@Override
@SuppressWarnings({"unchecked"})
public List<String> getJobNames() {
List results = getCollection().distinct(JOB_NAME_KEY);
Collections.sort(results);
return results;
}
private String createJobKey(JobParameters jobParameters) {
Map<String, JobParameter> props = jobParameters.getParameters();
StringBuilder stringBuilder = new StringBuilder();
List<String> keys = new ArrayList<>(props.keySet());
Collections.sort(keys);
for (String key : keys) {
stringBuilder.append(key).append("=").append(props.get(key).toString()).append(";");
}
MessageDigest digest;
try {
digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("MD5 algorithm not available. Fatal (should be in the JDK).");
}
try {
byte[] bytes = digest.digest(stringBuilder.toString().getBytes("UTF-8"));
return String.format("%032x", new BigInteger(1, bytes));
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("UTF-8 encoding not available. Fatal (should be in the JDK).");
}
}
@Override
protected DBCollection getCollection() {
return mongoTemplate.getCollection(JobInstance.class.getSimpleName());
}
private List<JobInstance> mapJobInstances(DBCursor dbCursor) {
List<JobInstance> results = new ArrayList<>();
while (dbCursor.hasNext()) {
results.add(mapJobInstance(dbCursor.next()));
}
return results;
}
private JobInstance mapJobInstance(DBObject dbObject) {
return mapJobInstance(dbObject, null);
}
private JobInstance mapJobInstance(DBObject dbObject, JobParameters jobParameters) {
JobInstance jobInstance = null;
if (dbObject != null) {
Long id = (Long) dbObject.get(JOB_INSTANCE_ID_KEY);
if (jobParameters == null) {
jobParameters = getJobParameters(id, mongoTemplate);
}
jobInstance = new JobInstance(id, (String) dbObject.get(JOB_NAME_KEY)); // should always be at version=0 because they never get updated
jobInstance.incrementVersion();
}
return jobInstance;
}
@Override
public List<JobInstance> findJobInstancesByName(String jobName, int start, int count) {
List<JobInstance> result = new ArrayList<>();
List<JobInstance> jobInstances = mapJobInstances(
getCollection()
.find(new BasicDBObject(JOB_NAME_KEY, jobName))
.sort(jobInstanceIdObj(-1L))
);
for (JobInstance instanceEntry : jobInstances) {
String key = instanceEntry.getJobName();
String curJobName = key.substring(0, key.lastIndexOf("|"));
if (curJobName.equals(jobName)) {
result.add(instanceEntry);
}
}
return result;
}
@Override
public int getJobInstanceCount(String jobName) throws NoSuchJobException {
int count = 0;
List<JobInstance> jobInstances = mapJobInstances(
getCollection()
.find(new BasicDBObject(JOB_NAME_KEY, jobName))
.sort(jobInstanceIdObj(-1L))
);
for (JobInstance instanceEntry : jobInstances) {
String key = instanceEntry.getJobName();
String curJobName = key.substring(0, key.lastIndexOf("|"));
if (curJobName.equals(jobName)) {
count++;
}
}
if (count == 0) {
throw new NoSuchJobException(String.format("No job instances for job name %s were found", jobName));
} else {
return count;
}
}
}
What all the above methods do is create a single JobInstance as a document, store that document in a collection named "JobInstance," and gets a single JobInstance from that collection and maps raw documents into the JobInstance object. Also, there is a method that creates a job key based on job parameters. The outcome from this class is a MongoDB collection "JobInstance" full of documents with job details.
The next interface that I would like to shortly describe is JobExecutionDao. This interface contains the following methods:
void saveJobExecution(JobExecution jobExecution);
void updateJobExecution(JobExecution jobExecution);
List<JobExecution> findJobExecutions(JobInstance jobInstance);
@Nullable
JobExecution getLastJobExecution(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
@Nullable
JobExecution getJobExecution(Long executionId);
void synchronizeStatus(JobExecution jobExecution);
The names of the method are self-explanatory. Only the method synchronizeStatus can be a bit mysterious. The idea behind this method is:
Because it may be possible that the status of a JobExecution is updated while running, the following method will synchronize only the status and version fields.
So, knowing that, let's have a look at the implementation of this interface for the MongoDB database:
@Repository
public class MongoJobExecutionDao extends AbstractMongoDao implements JobExecutionDao {
@PostConstruct
public void init() {
super.init();
getCollection().createIndex(
BasicDBObjectBuilder.start()
.add(JOB_EXECUTION_ID_KEY, 1)
.add(JOB_INSTANCE_ID_KEY, 1)
.get()
);
}
public void saveJobExecution(JobExecution jobExecution) {
validateJobExecution(jobExecution);
jobExecution.incrementVersion();
Long id = getNextId(JobExecution.class.getSimpleName(), mongoTemplate);
save(jobExecution, id);
}
private void save(JobExecution jobExecution, Long id) {
jobExecution.setId(id);
DBObject object = toDbObjectWithoutVersion(jobExecution);
object.put(VERSION_KEY, jobExecution.getVersion());
getCollection().save(object);
}
private DBObject toDbObjectWithoutVersion(JobExecution jobExecution) {
return BasicDBObjectBuilder.start()
.add(JOB_EXECUTION_ID_KEY, jobExecution.getId())
.add(JOB_INSTANCE_ID_KEY, jobExecution.getJobId())
.add(START_TIME_KEY, jobExecution.getStartTime())
.add(END_TIME_KEY, jobExecution.getEndTime())
.add(STATUS_KEY, jobExecution.getStatus().toString())
.add(EXIT_CODE_KEY, jobExecution.getExitStatus().getExitCode())
.add(EXIT_MESSAGE_KEY, jobExecution.getExitStatus().getExitDescription())
.add(CREATE_TIME_KEY, jobExecution.getCreateTime())
.add(LAST_UPDATED_KEY, jobExecution.getLastUpdated()).get();
}
private void validateJobExecution(JobExecution jobExecution) {
Assert.notNull(jobExecution, "JobExecution cannot be null.");
Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null.");
Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null.");
Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null");
}
public synchronized void updateJobExecution(JobExecution jobExecution) {
validateJobExecution(jobExecution);
Long jobExecutionId = jobExecution.getId();
Assert.notNull(jobExecutionId, "JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
Assert.notNull(jobExecution.getVersion(), "JobExecution version cannot be null. JobExecution must be saved before it can be updated");
Integer version = jobExecution.getVersion() + 1;
if (getCollection().findOne(jobExecutionIdObj(jobExecutionId)) == null) {
throw new NoSuchObjectException(String.format("Invalid JobExecution, ID %s not found.", jobExecutionId));
}
DBObject object = toDbObjectWithoutVersion(jobExecution);
object.put(VERSION_KEY, version);
WriteResult update = getCollection().update(
BasicDBObjectBuilder.start()
.add(JOB_EXECUTION_ID_KEY, jobExecutionId)
.add(VERSION_KEY, jobExecution.getVersion())
.get(),
object
);
jobExecution.incrementVersion();
}
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
Assert.notNull(jobInstance, "Job cannot be null.");
Long id = jobInstance.getId();
Assert.notNull(id, "Job Id cannot be null.");
DBCursor dbCursor = getCollection()
.find(jobInstanceIdObj(id))
.sort(new BasicDBObject(JOB_EXECUTION_ID_KEY, -1));
List<JobExecution> result = new ArrayList<>();
while (dbCursor.hasNext()) {
DBObject dbObject = dbCursor.next();
result.add(mapJobExecution(jobInstance, dbObject));
}
return result;
}
public JobExecution getLastJobExecution(JobInstance jobInstance) {
Long id = jobInstance.getId();
DBCursor dbCursor = getCollection()
.find(jobInstanceIdObj(id))
.sort(new BasicDBObject(CREATE_TIME_KEY, -1))
.limit(1);
if (!dbCursor.hasNext()) {
return null;
} else {
DBObject singleResult = dbCursor.next();
if (dbCursor.hasNext()) {
throw new IllegalStateException("There must be at most one latest job execution");
}
return mapJobExecution(jobInstance, singleResult);
}
}
public Set<JobExecution> findRunningJobExecutions(String jobName) {
DBCursor instancesCursor = mongoTemplate.getCollection(JobInstance.class.getSimpleName())
.find(new BasicDBObject(JOB_NAME_KEY, jobName), jobInstanceIdObj(1L));
List<Long> ids = new ArrayList<>();
while (instancesCursor.hasNext()) {
ids.add((Long) instancesCursor.next().get(JOB_INSTANCE_ID_KEY));
}
DBCursor dbCursor = getCollection().find(
BasicDBObjectBuilder
.start()
.add(JOB_INSTANCE_ID_KEY, new BasicDBObject("$in", ids.toArray()))
.add(END_TIME_KEY, null).get()).sort(
jobExecutionIdObj(-1L));
Set<JobExecution> result = new HashSet<>();
while (dbCursor.hasNext()) {
result.add(mapJobExecution(dbCursor.next()));
}
return result;
}
public JobExecution getJobExecution(Long executionId) {
return mapJobExecution(getCollection().findOne(jobExecutionIdObj(executionId)));
}
public void synchronizeStatus(JobExecution jobExecution) {
Long id = jobExecution.getId();
DBObject jobExecutionObject = getCollection().findOne(jobExecutionIdObj(id));
int currentVersion = jobExecutionObject != null ? ((Integer) jobExecutionObject.get(VERSION_KEY)) : 0;
if (currentVersion != jobExecution.getVersion()) {
if (jobExecutionObject == null) {
save(jobExecution, id);
jobExecutionObject = getCollection().findOne(jobExecutionIdObj(id));
}
String status = (String) jobExecutionObject.get(STATUS_KEY);
jobExecution.upgradeStatus(BatchStatus.valueOf(status));
jobExecution.setVersion(currentVersion);
}
}
@Override
protected DBCollection getCollection() {
return mongoTemplate.getCollection(JobExecution.class.getSimpleName());
}
private JobExecution mapJobExecution(DBObject dbObject) {
return mapJobExecution(null, dbObject);
}
private JobExecution mapJobExecution(JobInstance jobInstance, DBObject dbObject) {
if (dbObject == null) {
return null;
}
Long id = (Long) dbObject.get(JOB_EXECUTION_ID_KEY);
JobExecution jobExecution;
if (jobInstance == null) {
jobExecution = new JobExecution(id);
} else {
JobParameters jobParameters = getJobParameters(jobInstance.getId(), mongoTemplate);
jobExecution = new JobExecution(jobInstance, id, jobParameters, null);
}
jobExecution.setStartTime((Date) dbObject.get(START_TIME_KEY));
jobExecution.setEndTime((Date) dbObject.get(END_TIME_KEY));
jobExecution.setStatus(BatchStatus.valueOf((String) dbObject.get(STATUS_KEY)));
jobExecution.setExitStatus(new ExitStatus(((String) dbObject.get(EXIT_CODE_KEY)), (String) dbObject.get(EXIT_MESSAGE_KEY)));
jobExecution.setCreateTime((Date) dbObject.get(CREATE_TIME_KEY));
jobExecution.setLastUpdated((Date) dbObject.get(LAST_UPDATED_KEY));
jobExecution.setVersion((Integer) dbObject.get(VERSION_KEY));
return jobExecution;
}
}
Similarly to previous implementations, MongoJobExecutionDao does CRUD operations on JobExecution object (except D). The above methods convert JobExecution to a MongoDB document and store that document to the database and also converts raw MongoDB documents to the JobExecution object when read from the database. The outcome of this implementation of JobExecutionDao is a MongoDB collection named "JobExecution," which is full of documents containing job execution details.
Last but not least, let's look at interface StepExecutionDao. It contains the following methods:
void saveStepExecution(StepExecution stepExecution);
void saveStepExecutions(Collection<StepExecution> stepExecutions);
void updateStepExecution(StepExecution stepExecution);
@Nullable
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);
void addStepExecutions(JobExecution jobExecution);
As you can see, it is also uses CRUD (without D). Lets have a closer look at the implementation of this interface.
@Repository
public class MongoStepExecutionDao extends AbstractMongoDao implements StepExecutionDao {
@PostConstruct
public void init() {
super.init();
getCollection().createIndex(
BasicDBObjectBuilder.start()
.add(STEP_EXECUTION_ID_KEY, 1)
.add(JOB_EXECUTION_ID_KEY, 1)
.get()
);
}
public void saveStepExecution(StepExecution stepExecution) {
Assert.isNull(stepExecution.getId(),
"to-be-saved (not updated) StepExecution can't already have an id assigned");
Assert.isNull(stepExecution.getVersion(),
"to-be-saved (not updated) StepExecution can't already have a version assigned");
validateStepExecution(stepExecution);
stepExecution.setId(getNextId(StepExecution.class.getSimpleName(), mongoTemplate));
stepExecution.incrementVersion(); // should be 0 now
DBObject object = toDbObjectWithoutVersion(stepExecution);
object.put(VERSION_KEY, stepExecution.getVersion());
getCollection().save(object);
}
private DBObject toDbObjectWithoutVersion(StepExecution stepExecution) {
return start()
.add(STEP_EXECUTION_ID_KEY, stepExecution.getId())
.add(STEP_NAME_KEY, stepExecution.getStepName())
.add(JOB_EXECUTION_ID_KEY, stepExecution.getJobExecutionId())
.add(START_TIME_KEY, stepExecution.getStartTime())
.add(END_TIME_KEY, stepExecution.getEndTime())
.add(STATUS_KEY, stepExecution.getStatus().toString())
.add(COMMIT_COUNT_KEY, stepExecution.getCommitCount())
.add(READ_COUNT_KEY, stepExecution.getReadCount())
.add(FILTER_COUT_KEY, stepExecution.getFilterCount())
.add(WRITE_COUNT_KEY, stepExecution.getWriteCount())
.add(EXIT_CODE_KEY, stepExecution.getExitStatus().getExitCode())
.add(EXIT_MESSAGE_KEY, stepExecution.getExitStatus().getExitDescription())
.add(READ_SKIP_COUNT_KEY, stepExecution.getReadSkipCount())
.add(WRITE_SKIP_COUNT_KEY, stepExecution.getWriteSkipCount())
.add(PROCESS_SKIP_COUT_KEY, stepExecution.getProcessSkipCount())
.add(ROLLBACK_COUNT_KEY, stepExecution.getRollbackCount())
.add(LAST_UPDATED_KEY, stepExecution.getLastUpdated()).get();
}
public synchronized void updateStepExecution(StepExecution stepExecution) {
Integer currentVersion = stepExecution.getVersion();
Integer newVersion = currentVersion + 1;
DBObject object = toDbObjectWithoutVersion(stepExecution);
object.put(VERSION_KEY, newVersion);
getCollection().update(
start()
.add(STEP_EXECUTION_ID_KEY, stepExecution.getId())
.add(VERSION_KEY, currentVersion).get(),
object
);
stepExecution.incrementVersion();
}
static BasicDBObject stepExecutionIdObj(Long id) {
return new BasicDBObject(STEP_EXECUTION_ID_KEY, id);
}
public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
return mapStepExecution(getCollection().findOne(BasicDBObjectBuilder.start()
.add(STEP_EXECUTION_ID_KEY, stepExecutionId)
.add(JOB_EXECUTION_ID_KEY, jobExecution.getId()).get()), jobExecution);
}
private StepExecution mapStepExecution(DBObject object, JobExecution jobExecution) {
if (object == null) {
return null;
}
StepExecution stepExecution = new StepExecution(
(String) object.get(STEP_NAME_KEY),
jobExecution,
(Long) object.get(STEP_EXECUTION_ID_KEY)
);
stepExecution.setStartTime((Date) object.get(START_TIME_KEY));
stepExecution.setEndTime((Date) object.get(END_TIME_KEY));
stepExecution.setStatus(BatchStatus.valueOf((String) object.get(STATUS_KEY)));
stepExecution.setCommitCount((Integer) object.get(COMMIT_COUNT_KEY));
stepExecution.setReadCount((Integer) object.get(READ_COUNT_KEY));
stepExecution.setFilterCount((Integer) object.get(FILTER_COUT_KEY));
stepExecution.setWriteCount((Integer) object.get(WRITE_COUNT_KEY));
stepExecution.setExitStatus(new ExitStatus(
(String) object.get(EXIT_CODE_KEY),
(String) object.get(EXIT_MESSAGE_KEY)
));
stepExecution.setReadSkipCount((Integer) object.get(READ_SKIP_COUNT_KEY));
stepExecution.setWriteSkipCount((Integer) object.get(WRITE_SKIP_COUNT_KEY));
stepExecution.setProcessSkipCount((Integer) object.get(PROCESS_SKIP_COUT_KEY));
stepExecution.setRollbackCount((Integer) object.get(ROLLBACK_COUNT_KEY));
stepExecution.setLastUpdated((Date) object.get(LAST_UPDATED_KEY));
stepExecution.setVersion((Integer) object.get(VERSION_KEY));
return stepExecution;
}
public void addStepExecutions(JobExecution jobExecution) {
DBCursor stepsCoursor = getCollection()
.find(jobExecutionIdObj(jobExecution.getId()))
.sort(stepExecutionIdObj(1L));
while (stepsCoursor.hasNext()) {
DBObject stepObject = stepsCoursor.next();
mapStepExecution(stepObject, jobExecution);
}
}
@Override
protected DBCollection getCollection() {
return mongoTemplate.getCollection(StepExecution.class.getSimpleName());
}
private void validateStepExecution(StepExecution stepExecution) {
Assert.notNull(stepExecution, "StepExecution cannot be null.");
Assert.notNull(stepExecution.getStepName(), "StepExecution step name cannot be null.");
Assert.notNull(stepExecution.getStartTime(), "StepExecution start time cannot be null.");
Assert.notNull(stepExecution.getStatus(), "StepExecution status cannot be null.");
}
@Override
public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
Assert.notNull(stepExecutions,"Attempt to save an null collect of step executions");
for (StepExecution stepExecution: stepExecutions) {
saveStepExecution(stepExecution);
}
}
}
Nothing new here. This implementation converts StepExecution object to a MongoDB document when saving and raw document to StepExecution object when reading from the database. The outcome of this implementation is a MongoDB collection named "StepExecution," which is full of documents containing details of step execution.
There is one class left to show. All of the above implementations extend abstract class AbstractMongoDao. So lets look at this class:
abstract class AbstractMongoDao {
static final String VERSION_KEY = "version";
static final String START_TIME_KEY = "startTime";
static final String END_TIME_KEY = "endTime";
static final String EXIT_CODE_KEY = "exitCode";
static final String EXIT_MESSAGE_KEY = "exitMessage";
static final String LAST_UPDATED_KEY = "lastUpdated";
static final String STATUS_KEY = "status";
private static final String SEQUENCES_COLLECTION_NAME = "Sequences";
private static final String ID_KEY = "_id";
private static final String NS_KEY = "_ns";
static final String DOT_ESCAPE_STRING = "\\{dot}";
static final String DOT_STRING = "\\.";
// Job Constants
static final String JOB_NAME_KEY = "jobName";
static final String JOB_INSTANCE_ID_KEY = "jobInstanceId";
static final String JOB_KEY_KEY = "jobKey";
static final String JOB_PARAMETERS_KEY = "jobParameters";
// Job Execution Constants
static final String JOB_EXECUTION_ID_KEY = "jobExecutionId";
static final String CREATE_TIME_KEY = "createTime";
// Job Execution Contexts Constants
static final String STEP_EXECUTION_ID_KEY = "stepExecutionId";
static final String TYPE_SUFFIX = "_TYPE";
// Step Execution Constants
static final String STEP_NAME_KEY = "stepName";
static final String COMMIT_COUNT_KEY = "commitCount";
static final String READ_COUNT_KEY = "readCount";
static final String FILTER_COUT_KEY = "filterCout";
static final String WRITE_COUNT_KEY = "writeCount";
static final String READ_SKIP_COUNT_KEY = "readSkipCount";
static final String WRITE_SKIP_COUNT_KEY = "writeSkipCount";
static final String PROCESS_SKIP_COUT_KEY = "processSkipCout";
static final String ROLLBACK_COUNT_KEY = "rollbackCount";
protected Logger logger;
@Autowired
protected MongoTemplate mongoTemplate;
protected abstract DBCollection getCollection();
protected void init() {
logger = LoggerFactory.getLogger(this.getClass());
}
Long getNextId(String name, MongoTemplate mongoTemplate) {
DBCollection collection = mongoTemplate.getDb()
.getCollection(SEQUENCES_COLLECTION_NAME);
BasicDBObject sequence = new BasicDBObject("name", name);
collection.update(
sequence,
new BasicDBObject("$inc", new BasicDBObject("value", 1L)),
true,
false
);
return (Long) collection.findOne(sequence).get("value");
}
void removeSystemFields(DBObject dbObject) {
dbObject.removeField(ID_KEY);
dbObject.removeField(NS_KEY);
}
BasicDBObject jobInstanceIdObj(Long id) {
return new BasicDBObject(MongoJobInstanceDao.JOB_INSTANCE_ID_KEY, id);
}
BasicDBObject jobExecutionIdObj(Long id) {
return new BasicDBObject(JOB_EXECUTION_ID_KEY, id);
}
@SuppressWarnings({"unchecked"})
JobParameters getJobParameters(Long jobInstanceId, MongoTemplate mongoTemplate) {
DBObject jobParamObj = mongoTemplate
.getCollection(JobInstance.class.getSimpleName())
.findOne(new BasicDBObject(jobInstanceIdObj(jobInstanceId)));
if (jobParamObj != null &&
jobParamObj.get(MongoJobInstanceDao.JOB_PARAMETERS_KEY) != null) {
Map<String, ?> jobParamsMap =
(Map<String, ?>) jobParamObj.get(MongoJobInstanceDao.JOB_PARAMETERS_KEY);
Map<String, JobParameter> map = new HashMap<>(jobParamsMap.size());
for (Map.Entry<String, ?> entry : jobParamsMap.entrySet()) {
Object param = entry.getValue();
String key = entry.getKey().replaceAll(DOT_ESCAPE_STRING, DOT_STRING);
if (param instanceof String) {
map.put(key, new JobParameter((String) param));
} else if (param instanceof Long) {
map.put(key, new JobParameter((Long) param));
} else if (param instanceof Double) {
map.put(key, new JobParameter((Double) param));
} else if (param instanceof Date) {
map.put(key, new JobParameter((Date) param));
} else {
map.put(key, null);
}
}
return new JobParameters(map);
}
return null;
}
}
This abstract class contains all the constants and methods used by its children. The constants are mainly the names of the attributes relevant to the proper dao interface. These constants are used as keys under which data are stored. Here also, MongoTemplate is autowired. There are also some utils methods that help with processing data.
Connection to the MongoDB database is defined in MongoDBConfig class.
@Configuration
public class MongoDBConfig extends AbstractMongoConfiguration {
private static Logger logger = LoggerFactory.getLogger(MongoDBConfig.class);
@Value("${spring.data.mongodb.database}")
private String database;
@Value("${spring.data.mongodb.host}")
private String host;
@Override
protected String getDatabaseName() {
return database;
}
@Override
public MongoClient mongo() {
MongoClientOptions.Builder mongoClientOptionsBuilder = MongoClientOptions.builder()
.writeConcern(WriteConcern.ACKNOWLEDGED)
.socketKeepAlive(true);
return new MongoClient(new MongoClientURI(host, mongoClientOptionsBuilder));
}
}
And that is it!
Summary
In this tutorial, I showed you how I used spring batch with mongodb. and how spring batch goodies can be saved and read from MongoDB. The full code is available on Github. The code and this article were built based on the spring-batch and springbatch-mongoDao repositories. I hope this tutorial was helpful.
Opinions expressed by DZone contributors are their own.
Comments