开篇说明
- 如果在这里获得过启发和思考,希望点赞支持!对于内容有不同的看法欢迎来信交流。
- 技术栈 >> java
- 邮箱 >> @163.com
描述
- 之前项目刚刚开始简易的通过实现单机版。 quartz定时任务,两张表实现数据持久化
- 由于项目持续了大半年的迭代更新,需要执行的定时任务增多。并且服务中还有其他业务,单机版本的定时任务影响了服务的集群搭建。所以,着手对其改进。
- 目标是保留单机版中两张表的使用,来对定时任务执行计划在页面控制,以及可查看每个任务的执行结果等信息。
我的思路
- 通过quartz官方提供的11张数据表作为数据存储,来实现集群服务的数据共享。
- 自定义表
schedule_job作为任务的初始化,以及后续对执行计划的修改,自定义表schedule_log作为执行记录的存储。此处与单机版一致。 - 定时任务监听器ScheduleJobListener.java记录任务的执行结果。
- 注意:与单机版不同的是,集群版的数据存储不在内存中而是在官方提供的11各表中。由于将启动多个服务,故,在初始化任务时需要判断该定时任务是否已经存在。(单机时内存中的数据每次停机均会清空,而集群中数据库中的数据却会一值保留,除非重新创建表)。因此我们需要修改ScheduleJobUtil工具类,以及初始化时候的逻辑。
- 此外我们还需要对添加一些数据源的配置。
第一步:添加依赖
<!--quartz依赖--> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.3</version> </dependency>
第二步:创建数据表
- 自定义的两个表,参考开篇中的单机版博客
- 11个表均由官方提供
-- 定时任务所需的数据表 DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; DROP TABLE IF EXISTS QRTZ_LOCKS; DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; DROP TABLE IF EXISTS QRTZ_TRIGGERS; DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; DROP TABLE IF EXISTS QRTZ_CALENDARS; CREATE TABLE QRTZ_JOB_DETAILS( SCHED_NAME VARCHAR(120) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, JOB_CLASS_NAME VARCHAR(250) NOT NULL, IS_DURABLE VARCHAR(1) NOT NULL, IS_NONCONCURRENT VARCHAR(1) NOT NULL, IS_UPDATE_DATA VARCHAR(1) NOT NULL, REQUESTS_RECOVERY VARCHAR(1) NOT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, NEXT_FIRE_TIME BIGINT(13) NULL, PREV_FIRE_TIME BIGINT(13) NULL, PRIORITY INTEGER NULL, TRIGGER_STATE VARCHAR(16) NOT NULL, TRIGGER_TYPE VARCHAR(8) NOT NULL, START_TIME BIGINT(13) NOT NULL, END_TIME BIGINT(13) NULL, CALENDAR_NAME VARCHAR(200) NULL, MISFIRE_INSTR SMALLINT(2) NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP) REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, REPEAT_COUNT BIGINT(7) NOT NULL, REPEAT_INTERVAL BIGINT(12) NOT NULL, TIMES_TRIGGERED BIGINT(10) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CRON_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, CRON_EXPRESSION VARCHAR(120) NOT NULL, TIME_ZONE_ID VARCHAR(80), PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPROP_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, STR_PROP_1 VARCHAR(512) NULL, STR_PROP_2 VARCHAR(512) NULL, STR_PROP_3 VARCHAR(512) NULL, INT_PROP_1 INT NULL, INT_PROP_2 INT NULL, LONG_PROP_1 BIGINT NULL, LONG_PROP_2 BIGINT NULL, DEC_PROP_1 NUMERIC(13,4) NULL, DEC_PROP_2 NUMERIC(13,4) NULL, BOOL_PROP_1 VARCHAR(1) NULL, BOOL_PROP_2 VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_BLOB_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, BLOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CALENDARS ( SCHED_NAME VARCHAR(120) NOT NULL, CALENDAR_NAME VARCHAR(200) NOT NULL, CALENDAR BLOB NOT NULL, PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_FIRED_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, ENTRY_ID VARCHAR(95) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, FIRED_TIME BIGINT(13) NOT NULL, SCHED_TIME BIGINT(13) NOT NULL, PRIORITY INTEGER NOT NULL, STATE VARCHAR(16) NOT NULL, JOB_NAME VARCHAR(200) NULL, JOB_GROUP VARCHAR(200) NULL, IS_NONCONCURRENT VARCHAR(1) NULL, REQUESTS_RECOVERY VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,ENTRY_ID)) ENGINE=InnoDB; CREATE TABLE QRTZ_SCHEDULER_STATE ( SCHED_NAME VARCHAR(120) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, LAST_CHECKIN_TIME BIGINT(13) NOT NULL, CHECKIN_INTERVAL BIGINT(13) NOT NULL, PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_LOCKS ( SCHED_NAME VARCHAR(120) NOT NULL, LOCK_NAME VARCHAR(40) NOT NULL, PRIMARY KEY (SCHED_NAME,LOCK_NAME)) ENGINE=InnoDB; CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); commit;
第三步:java代码实现
- 添加配置 quartz.properties
#quartz集群配置 #调度标识名 集群中每一个实例都必须使用相同的名称 org.quartz.scheduler.instanceName=DefaultQuartzScheduler #ID设置为自动获取 每一个必须不同 org.quartz.scheduler.instanceId=AUTO org.quartz.scheduler.makeSchedulerThreadDaemon=true #线程池的实现类(一般使用SimpleThreadPool即可满足需求) org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #指定在线程池里面创建的线程是否是守护线程 org.quartz.threadPool.makeThreadsDaemons=true #指定线程数,至少为1(无默认值) org.quartz.threadPool.threadCount:20 #设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5) org.quartz.threadPool.threadPriority:5 #数据保存方式为数据库持久化 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #表的前缀,默认QRTZ_ org.quartz.jobStore.tablePrefix=QRTZ_ #是否加入集群 org.quartz.jobStore.isClustered=true # 信息保存时间 默认值60秒 org.quartz.jobStore.misfireThreshold=25000
- ScheduleJobListener.java 定时任务监听器:通过监听的方式记录 记录定时任务的执行记录。
import com.qykj.admin.service.schedule.JobService; import com.qykj.core.constants.schedule.JobLogStatusEnum; import com.qykj.core.util.DateUtil; import com.qykj.core.util.SpringUtils; import org.apache.logging.log4j.ThreadContext; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobListener; import org.springframework.stereotype.Component; import java.util.UUID; / * ===================================================================================================================== * jiangshaoneng <.@163.com> 2021/11/20 14:18 * * 定时任务监听器:通过监听的方式记录 记录定时任务的执行记录。 * ===================================================================================================================== */ @Component public class ScheduleJobListener implements JobListener{
private JobService jobService; // 用于保存 logId private ThreadLocal<String> threadLocalLogId = new ThreadLocal<>(); // startTime private ThreadLocal<Long> threadLocalStartTime = new ThreadLocal<>(); @Override public String getName() {
return "myJobListener"; } @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
ThreadContext.put("traceID", UUID.randomUUID().toString().replace("-","")); if(jobService == null){
jobService = SpringUtils.getBean(JobService.class); } JobDetail jobDetail = jobExecutionContext.getJobDetail(); String name = jobDetail.getKey().getName(); String logId = jobService.saveScheduleLog(name, JobLogStatusEnum.RUNNING.getCode(), ""); threadLocalLogId.set(logId); // 把执行记录放到 threadLocal,提供给执行结束后取此结果 threadLocalStartTime.set(DateUtil.getCurrentTime()); } @Override public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {
String logId = threadLocalLogId.get(); // 执行记录logId long runTime = DateUtil.getCurrentTime() - threadLocalStartTime.get(); jobService.updateScheduleLog(logId, JobLogStatusEnum.ERROR.getCode(), "执行失败",runTime); } @Override public void jobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException e) {
String logId = threadLocalLogId.get(); // 执行记录logId long runTime = DateUtil.getCurrentTime() - threadLocalStartTime.get(); if(e == null){
// 没有异常修改记录为成功 jobService.updateScheduleLog(logId, JobLogStatusEnum.SUCCESS.getCode(), "执行成功",runTime); }else{
// 存在异常修改记录失败,并且把异常信息保存到数据库中 jobService.updateScheduleLog(logId, JobLogStatusEnum.ERROR.getCode(), e.toString(), runTime); } } }
- ScheduleConfig.java 定时任务配置类,配置数据源,监听器等信息
import com.qykj.admin.job.ScheduleJobListener; import com.qykj.admin.job.ScheduleJobUtil; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import javax.sql.DataSource; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executor; import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals; / * ===================================================================================================================== * jiangshaoneng <.@163.com> 22021/11/20 14:18 * * 定时任务配置类。 * ===================================================================================================================== */ @Configuration public class SchedulerConfig {
@Qualifier("writeDataSource") @Autowired private DataSource dataSource; @Bean public Scheduler scheduler() throws IOException{
Scheduler scheduler = schedulerFactoryBean().getScheduler(); try {
// 默认将 MyJobListener 绑定到 JOB_GROUP_NAME,在MyJobListener通过监听的方式记录 记录定时任务的执行记录 scheduler.getListenerManager() .addJobListener(new ScheduleJobListener(), jobGroupEquals(ScheduleJobUtil.JOB_GROUP_NAME)); } catch (SchedulerException e) {
e.printStackTrace(); } return scheduler; } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException{
SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("cluster_scheduler"); factory.setDataSource(dataSource); factory.setApplicationContextSchedulerContextKey("application"); factory.setQuartzProperties(quartzProperties()); factory.setTaskExecutor(schedulerThreadPool()); factory.setStartupDelay(0); return factory; } @Bean public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } @Bean public Executor schedulerThreadPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); executor.setQueueCapacity(Runtime.getRuntime().availableProcessors()); return executor; } }
- ScheduleJobUtils.java 定时任务工具类,包括任务的增删改查,已配置数据库的情况,因此所有操作都是针对数据库中的操作。(jar包已经集成,只需调用对应方法即可,无需我们实现数据库操作)
import lombok.extern.slf4j.Slf4j; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; / * ===================================================================================================================== * jiangshaoneng <.@163.com> 2021/11/20 14:18 * * 任务调度的工具类,包括定时任务的新增修改删除等。本系统暂时对任务组,触发器组名暂时均使用同一个。 * 如:JOB_GROUP_NAME,TRIGGER_GROUP_NAME * ===================================================================================================================== */ @Slf4j @Component public class ScheduleJobUtil {
@Autowired private Scheduler scheduler; public static String JOB_GROUP_NAME = "DEFAULT_JOB_GROUP"; private static String TRIGGER_GROUP_NAME = "DEFAULT_TRIGGER_GROUP"; public ScheduleJobUtil(){
} / * @Description: 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名 * @param jobName 任务名 * @param cls 任务 * @param cron 时间设置,参考quartz说明文档 * @throws SchedulerException */ public void addJob(String jobName, Class cls, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TRIGGER_GROUP_NAME); Trigger trigger = scheduler.getTrigger(triggerKey); if(trigger != null){
log.info("添加任务:{},{},{} 已存在", jobName, cls, cron); return; } // 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例 JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(jobName, JOB_GROUP_NAME).build(); // 构建一个触发器,规定触发的规则 trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器 .withIdentity(jobName, TRIGGER_GROUP_NAME)// 给触发器起一个名字和组名 .startNow()// 立即执行 .withSchedule(CronScheduleBuilder.cronSchedule(cron)) // 触发器的执行时间 .usingJobData(dataMap) // 定时器一些简单的数据 .build();// 产生触发器 scheduler.scheduleJob(jobDetail, trigger); log.info("添加任务:{},{},{}", jobName, cls, cron); // 启动 if (!scheduler.isShutdown()) {
scheduler.start(); } } / * @Description: 添加一个定时任务 * * @param jobName 任务名 * @param jobGroupName 任务组名 * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 * @param cls 任务 * @param cron 时间设置,参考quartz说明文档 */ public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class cls, String cron) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TRIGGER_GROUP_NAME); Trigger trigger = scheduler.getTrigger(triggerKey); if(trigger != null){
log.info("添加任务:{},{},{},{},{},{} 已存在",jobName,jobGroupName,triggerName,triggerGroupName,cls,cron); return; } // 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例 JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(jobName, jobGroupName).build(); // 构建一个触发器,规定触发的规则 trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器 .withIdentity(jobName, triggerGroupName)// 给触发器起一个名字和组名 .startNow()// 立即执行 .withSchedule(CronScheduleBuilder.cronSchedule(cron)) // 触发器的执行时间 .build();// 产生触发器 scheduler.scheduleJob(jobDetail, trigger); log.info("添加任务:{},{},{},{},{},{}",jobName,jobGroupName,triggerName,triggerGroupName,cls,cron); // 启动 if (!scheduler.isShutdown()) {
scheduler.start(); } } / * @Description: 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名) * * @param jobName * @param cron * @throws SchedulerException */ public void modifyJobTime(String jobName, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, TRIGGER_GROUP_NAME); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (trigger == null) {
log.info("修改任务失败,此任务不存在:{},{}", jobName, cron); return; } String oldTime = trigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(cron)) {
JobDetail jobDetail = scheduler.getJobDetail(new JobKey(jobName, JOB_GROUP_NAME)); Class objJobClass = jobDetail.getJobClass(); removeJob(jobName); addJob(jobName, objJobClass, cron, dataMap); log.info("修改任务:{},{}",jobName,cron); } } / * @Description: 修改或添加一个任务,jobName 存在时修改任务,不存在时则新增任务 * */ public void modifyOrAddJobTime(String jobName, Class cls, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, TRIGGER_GROUP_NAME); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (trigger == null) {
addJob(jobName, cls, cron, dataMap); log.info("添加任务:{},{}",jobName,cron); }else{
String oldTime = trigger.getCronExpression(); JobDataMap oldJobDataMap = trigger.getJobDataMap(); if (!oldTime.equalsIgnoreCase(cron) || !oldJobDataMap.equals(dataMap)) {
JobDetail jobDetail = scheduler.getJobDetail(new JobKey(jobName, JOB_GROUP_NAME)); Class objJobClass = jobDetail.getJobClass(); removeJob(jobName); addJob(jobName, objJobClass, cron, dataMap); log.info("修改任务:{},{}",jobName,cron); }else {
log.info("任务存在:{},{}",jobName,cron); } } } / * @Description: 移除一个任务(使用默认的任务组名,触发器名,触发器组名) * @param jobName * @throws SchedulerException */ public void removeJob(String jobName) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, TRIGGER_GROUP_NAME); // 停止触发器 scheduler.pauseJob(jobKey); scheduler.unscheduleJob(new TriggerKey(jobName, TRIGGER_GROUP_NAME));// 移除触发器 scheduler.deleteJob(jobKey);// 删除任务 log.info("移除任务:{}",jobName); } / * 移除任务 * * @param jobName * @param jobGroupName * @param triggerName * @param triggerGroupName * @throws SchedulerException */ public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, jobGroupName); // 停止触发器 scheduler.pauseJob(jobKey); scheduler.unscheduleJob(new TriggerKey(jobName, triggerGroupName));// 移除触发器 scheduler.deleteJob(jobKey);// 删除任务 log.info("移除任务:{},{},{},{}",jobName,jobGroupName,triggerName,triggerGroupName); } / * 启动所有任务 * @throws SchedulerException */ public void startJobs() throws SchedulerException {
scheduler.start(); log.info("启动所有任务"); } / * 关闭所有定时任务 * @throws SchedulerException */ public void shutdownJobs() throws SchedulerException {
if (!scheduler.isShutdown()) {
scheduler.shutdown(); log.info("关闭所有任务"); } } }
- ScheduleJobInitListener.java 监听项目启动时初始化定时任务
import com.qykj.admin.service.schedule.JobService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; / * ===================================================================================================================== * jiangshaoneng <.@163.com> 2021/11/20 14:18 * * 启动项目时,启动数据库配置启动的定时任务。可支持集权部署 * ===================================================================================================================== */ @Slf4j @Component public class ScheduleJobInitListener implements ApplicationListener<ContextRefreshedEvent>{
@Autowired private JobService jobService; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
log.info("开始初始化定时任务 ..."); jobService.initScheduleJob(); log.info("初始化定时任务成功 ..."); } }
- ScheduleService.java 定时任务初始化,修改等逻辑
import com.qykj.admin.job.ScheduleJobUtil; import com.qykj.core.domain.entity.schedule.ScheduleJob; import com.qykj.core.domain.entity.schedule.ScheduleLog; import com.qykj.core.exception.AppException; import com.qykj.core.exception.ErrorCode; import com.qykj.core.util.JsonUtils; import com.qykj.repo.impl.schedule.ScheduleJobRepoImpl; import com.qykj.repo.impl.schedule.ScheduleLogRepoImpl; import org.quartz.JobDataMap; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class JobService {
private static final Logger logger = LoggerFactory.getLogger(JobService.class); @Autowired private ScheduleJobRepoImpl scheduleJobRepo; @Autowired private ScheduleLogRepoImpl scheduleLogRepo; @Autowired private ScheduleJobUtil scheduleJobUtil; / * 查看所有任务列表 */ public List<ScheduleJob> getAllJob(){
return scheduleJobRepo.getList(); } / * 查看某个任务的执行详情 */ public List<ScheduleLog> getJobScheduleDetail(int jobId){
return scheduleLogRepo.getListByJobId(jobId); } / * 查看某个任务的详情 */ public ScheduleJob getJobDetail(int jobId){
return scheduleJobRepo.getScheduleJobById(jobId); } / * 修改任务 */ public Map<String,Object> updateJob(ScheduleJob scheduleJob){
Map<String,Object> result = new HashMap<>(); try {
Class<?> clszz = Class.forName(scheduleJob.getClazz()); Integer status = scheduleJob.getStatus(); if(status == 1){
// 修改或者添加任务 Map<String, String> map = (Map<String, String>) JsonUtils.json2Map(scheduleJob.getJobDataJson()); JobDataMap dataMap = new JobDataMap(map); // 配置定时器需要的数据 scheduleJobUtil.modifyOrAddJobTime(scheduleJob.getName(), clszz, scheduleJob.getCron(), dataMap); }else if(status == 2){
// 停止任务 scheduleJobUtil.removeJob(scheduleJob.getName()); }else{
return result; } }catch (SchedulerException e){
logger.error("修改定时任务异常:{}", e.toString()); throw new AppException(ErrorCode.SYS_ERROR); }catch (ClassNotFoundException e){
logger.error("修改定时任务异常,无法找到指定的任务类"); throw new AppException(ErrorCode.SYS_PARAMS_ERROR.code(), "无法找到指定的任务类"); } scheduleJobRepo.updateScheduleJob(scheduleJob); return result; } / * 启动数据库中配置的定时任务 */ public void initScheduleJob(){
// 移除不需要执行的定时任务 List<ScheduleJob> disableList = scheduleJobRepo.getDisableList(); for (ScheduleJob scheduleJob: disableList){
try {
scheduleJobUtil.removeJob(scheduleJob.getName()); } catch (SchedulerException e) {
logger.error("移除定时任务:{}异常:{}", scheduleJob.getName(), e.toString()); } } // 添加需要执行的定时任务 List<ScheduleJob> enableList = scheduleJobRepo.getEnableList(); for (ScheduleJob scheduleJob: enableList){
Map<String, String> map = new HashMap<>(); try {
map = (Map<String, String>) JsonUtils.json2Map(scheduleJob.getJobDataJson()); }catch (Exception e){
logger.error("定时任务:{},DataJson格式不合法", scheduleJob.getName()); } try {
JobDataMap dataMap = new JobDataMap(map); // 配置定时器需要的数据 Class<?> clszz = Class.forName(scheduleJob.getClazz()); scheduleJobUtil.modifyOrAddJobTime(scheduleJob.getName(), clszz, scheduleJob.getCron(), dataMap); } catch (SchedulerException e){
logger.error("初始化启动定时任务:{},异常:{}", scheduleJob.getName(), e.toString()); continue; } catch (ClassNotFoundException e){
logger.error("初始化启动定时任务异常,无法找到指定的任务类"); continue; } } } / * 新增一条执行日志 */ public String saveScheduleLog(String name, int status, String logInfo){
ScheduleJob scheduleJob = scheduleJobRepo.getScheduleJobByName(name); ScheduleLog scheduleLog = new ScheduleLog(scheduleJob,status,logInfo); scheduleLogRepo.saveScheduleLog(scheduleLog); return scheduleLog.getId(); } / * 更新一条执行日志 */ public void updateScheduleLog(String id, int status, String logInfo, long runTime){
scheduleLogRepo.updateScheduleLog(id,status,logInfo,runTime); } }
- ScheduleController.java 提供页面操作的接口
import com.qykj.admin.aspect.AuthPermissions; import com.qykj.admin.service.schedule.JobService; import com.qykj.core.domain.entity.schedule.ScheduleJob; import com.qykj.core.domain.entity.schedule.ScheduleLog; import com.qykj.core.view.MSG; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.List; @RestController @RequestMapping("/v2/schedule/job") @Api(tags = "Schedule:001-定时任务控制管理") public class JobControllerV2 {
@Autowired private JobService jobService; @ApiOperation(value = "查看所有任务列表") @GetMapping("/list") public MSG<List<ScheduleJob>> getJobList(){
List<ScheduleJob> allJob = jobService.getAllJob(); return MSG.SUCCESS(allJob); } @ApiOperation(value = "查看某个任务的执行详情") @GetMapping("/schedule/detail") public MSG<List<ScheduleLog>> getJobScheduleDetail(@RequestParam("jobId") int jobId){
List<ScheduleLog> jobScheduleDetail = jobService.getJobScheduleDetail(jobId); return MSG.SUCCESS(jobScheduleDetail); } @ApiOperation(value = "查看某个任务的详情") @GetMapping("/detail") public MSG<ScheduleJob> getJobDetail(@RequestParam("jobId") int jobId){
ScheduleJob jobDetail = jobService.getJobDetail(jobId); return MSG.SUCCESS(jobDetail); } @ApiOperation(value = "修改任务") @PostMapping("/update") public MSG updateJob(@RequestBody ScheduleJob job){
jobService.updateJob(job); return MSG.SUCCESS(); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/230226.html原文链接:https://javaforall.net
