Dynamic Schedulers and Custom Cross-Server Schedule Lock
In this article, we gonna look into TaskScheduler and configure schedulers dynamically based on the database values.
Join the DZone community and get the full member experience.
Join For FreeAs part of java development, we always may run into a situation to configure a Job or Scheduler.
Quartz is one of the popular scheduling API.
In this article, we gonna look into TaskScheduler and configure schedulers dynamically based on the database values. This also includes simple mutex kind of logic, how can we avoid our scheduler running at a time when the app running as a cluster.
Before going into the topic, Let me give self inro. This is Venkatesh Rajendran, a full-stack software developer, and my key areas are Java, Spring framework (spring boot ), and hibernate, etc.. In the frontend , I'm not a big guy. I still learning. I have worked on jQuery and reactjs. And I'm exploring NodeJs as well. And This is my very first blog in my life :).
To manage and run our schedulers we need an ExecutorService. ThreadPoolTaskScheduler is one of the widely used SchedulingTaskExecutor. Let's create a bean of TaskScheduler using ThreadPoolTaskScheduler.
xxxxxxxxxx
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setThreadNamePrefix("DynamicSchedules-");
threadPoolTaskScheduler.initialize();
return threadPoolTaskScheduler;
}
Let's move to the configurations part.
Since Schedulers are dynamically configured at run-time, We need to move our cron expressions and bean names that gonna run the job.
For these purposes, I have created an Entity called SchedulerConfigEntity.
xxxxxxxxxx
name = "scheduler_config") (
public class SchedulerConfigEntity {
private Integer id;
name = "bean_name") (
private String beanName;
private String cron;
EnumType.STRING) (
private Lock lock_;
}
In general, A scheduler should run a block of code. That's the purpose of the scheduler. So It's better to have an Interface that abstracts this operation. Also, this interface includes two more abstract methods.
xxxxxxxxxx
public interface Scheduler {
void schedule(Integer id);
boolean acquireLock(Integer id);
void resetLock(Integer id);
}
acquireLock
and resetLock
are used for Cross-Server Scheduler Locking
.
Let's create an implementation for this interface.
xxxxxxxxxx
public class ExampleSchedulerImpl implements Scheduler {
private SchedulerConfigRepo schedulerConfigRepo;
public void schedule(Integer id) {
if(acquireLock(id)){
log.warn("Running on another instance.");
return;
}
log.info("Hey.. This is printed by dynamically scheduled tasks");
resetLock(id);
}
public boolean acquireLock(Integer id) {
int rowsUpdated = schedulerConfigRepo.acquireLock(id);
return rowsUpdated > 0;
}
public void resetLock(Integer id) {
schedulerConfigRepo.resetLock(id);
}
}
Let's configure our scheduler,
Create an entry for SchedulerConfigEntity in db. Here I have a data.sql file that does the insert for us.
INSERT INTO scheduler_config VALUES
(1,'exampleSchedulerImpl','* * * * * *','OPEN');
And here our configuration class:
xxxxxxxxxx
public class SchedulerConfig {
// Todo future usage
private static Map<String, ScheduledFuture<?>> futureMap = new HashMap<>();
private SchedulerConfigRepo schedulerConfigRepo;
private ApplicationContext applicationContext;
private TaskScheduler taskScheduler;
public SchedulerConfig(SchedulerConfigRepo schedulerConfigRepo, ApplicationContext applicationContext, TaskScheduler taskScheduler) {
this.schedulerConfigRepo = schedulerConfigRepo;
this.applicationContext = applicationContext;
this.taskScheduler = taskScheduler;
}
public void configSchedules(){
List<SchedulerConfigEntity> configs = schedulerConfigRepo.findAll();
for (SchedulerConfigEntity config : configs) {
// Get the ExampleSchedulerImpl that implements Scheduler, default bean name for ExampleSchedulerImpl will be exampleSchedulerImpl.
Scheduler scheduler = (Scheduler) applicationContext.getBean(config.getBeanName());
if(Objects.isNull(futureMap.get(config.getBeanName()))) {
// Schedule the task with cron expression
ScheduledFuture<?> schedule = taskScheduler.schedule(()->{
scheduler.schedule(config.getId());
}, new CronTrigger(config.getCron()));
//ScheduledFuture can be used for checking the job status.
futureMap.put(config.getBeanName(), schedule);
}
}
}
}
How do we enable Cross Server Scheduler lock, here we have introduced a column called lock_
with default='OPEN'
. When starting the job It will try to acquire the lock and set to 'LOCKED'
If not It should be running on another instance. When the job ends or fails, we should reset the lock back to 'OPEN'.
Let's check the JPA repository how it sets and resets the lock.
xxxxxxxxxx
public interface SchedulerConfigRepo extends JpaRepository<SchedulerConfigEntity, Integer> {
value = "UPDATE scheduler_config SET lock_='LOCKED' WHERE id=:id AND lock_='OPEN'", nativeQuery = true) (
int acquireLock( ("id") Integer id);
value = "UPDATE scheduler_config SET lock_='OPEN' WHERE id=:id", nativeQuery = true) (
void resetLock( ("id") Integer id);
}
That's all. So we have an implementation for DynamicScheduler that reads the configuration from the database and a custom cross-server scheduler locking strategy. Please find the full implementation in my Git Repo.
Opinions expressed by DZone contributors are their own.
Comments