一. SchedulingConfigurer解析
Spring 中,创建定时任务除了使用@Scheduled 注解外,还可以使用 SchedulingConfigurer。既然两者都可以实现定时任务,那有什么不同呢?
@Schedule注解的一个缺点就是其定时时间不能动态更改,它适用于具有固定任务周期的任务,若要修改任务执行周期,只能走“停服务→修改任务执行周期→重启服务”这条路。而基于 SchedulingConfigurer 接口方式可以做到。SchedulingConfigurer 接口可以实现在@Configuration 类上,同时不要忘了,还需要@EnableScheduling 注解的支持。
package org.springframework.scheduling.annotation;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
@FunctionalInterface
public interface SchedulingConfigurer {
void configureTasks(ScheduledTaskRegistrar var1);
}
ScheduledTaskRegistrar类包括以下几个重要方法:
从方法的命名上可以猜到,方法包含定时任务,延时任务,基于 Cron 表达式的任务,以及 Trigger 触发的任务。
二. 代码示例
create table if not exists sys_schedule_lock(
task_id varchar(32) primary key not null comment '任务类型id',
task_desc varchar(32) default null comment '任务类型说明',
exec_ip varchar(32) default null comment '获取锁的机器ip',
acquire_time datetime DEFAULT NULL comment '获取锁的时间',
status int(11) comment '当前任务id锁状态: 0-空闲 1-占用'
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务执行数据库锁';
INSERT INTO zzxypm.sys_schedule_lock (task_id, task_desc, exec_ip, acquire_time, status) VALUES ('TASK_TODO', '定时推送待办任务', '127.0.0.1', null, 0);
@NoArgsConstructor
@Data
@TableName("sys_schedule_lock")
public class ScheduleLock {
@TableId(value = "task_id")
private String taskId;
private String taskDesc;
private Date acquireTime;
private String execIp;
private int status;
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zzxypm.entity.ScheduleLock;
import java.util.Map;
public interface ScheduleLockMapper extends BaseMapper<ScheduleLock> {
int lock(Map<String,Object> map);
int unlock(Map<String,Object> map);
int batchResetStatus();
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zzxypm.mapper.ScheduleLockMapper">
<update id="lock" parameterType="map">
update sys_schedule_lock
set exec_ip = #{execIp},
acquire_time = now(),
status = 1
where task_id = #{taskId}
and status = 0;
</update>
<update id="unlock" parameterType="map">
update sys_schedule_lock
set
acquire_time = now(),
status = 0
where task_id = #{taskId}
and status = 1
and exec_ip = #{execIp};
</update>
<update id="batchResetStatus">
update sys_schedule_lock
set status = 0
where status = 1
</update>
</mapper>
package com.zzxypm.schedule;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
public enum TaskIdEnum {
TASK_TODO("TASK_TODO", "定时推送待办任务");
TaskIdEnum(String taskId, String desc) {
this.taskId = taskId;
this.desc = desc;
}
/**
* 任务唯一id
*/
private String taskId;
/**
* 任务类型描述
*/
private String desc;
public String getTaskId() {
return taskId;
}
public String getDesc() {
return desc;
}
public static TaskIdEnum acquire(final String taskId) {
Optional<TaskIdEnum> serializeEnum =
Arrays.stream(TaskIdEnum.values())
.filter(v -> Objects.equals(v.getTaskId(), taskId))
.findFirst();
return serializeEnum.orElse(TaskIdEnum.TASK_TODO);
}
public static List<TaskIdEnum> getEnumList() {
return Arrays.asList(TaskIdEnum.values());
}
}
package com.zzxypm.schedule;
import com.zzxypm.common.util.DateUtil;
import com.zzxypm.common.util.IPV4Util;
import com.zzxypm.common.util.SpringContextHolder;
import com.zzxypm.mapper.ScheduleLockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;
/**
* 抽象任务骨架定义
*/
@Slf4j
public abstract class AbstractTask implements Runnable{
private TaskIdEnum taskIdEnum;
private String cron;
private ScheduleLockMapper lockMapper = SpringContextHolder.getBean(ScheduleLockMapper.class);
public AbstractTask(TaskIdEnum taskIdEnum,String cron) {
this.taskIdEnum = taskIdEnum;
this.cron = cron;
}
@Transactional
@Override
public void run() {
String taskId = taskIdEnum.getTaskId();
String ip = IPV4Util.getLocalIpv4Address();
Map<String,Object> map = new HashMap<>();
map.put("taskId",taskId);
map.put("execIp",ip);
int lock = lockMapper.lock(map);
log.info("lock: taskId=[{}],ip=[{}],lock=[{}]", taskId, ip, lock);
if (lock == 1) {
log.info("taskId:[{}],任务执行开始时间:[{}]", taskId, DateUtil.now());
doWork();
log.info("taskId:[{}],任务执行结束时间:[{}]", taskId, DateUtil.now());
int unlock = lockMapper.unlock(map);
log.info("unlock: taskId=[{}],ip=[{}],unlock=[{}]", taskId, ip, unlock);
}
}
protected abstract void doWork();
public TaskIdEnum getTaskIdEnum() {
return taskIdEnum;
}
public String getCron() {
return cron;
}
}
package com.zzxypm.schedule.task;
import com.zzxypm.schedule.AbstractTask;
import com.zzxypm.schedule.TaskIdEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 待办推送任务
*/
@Slf4j
public class TodoPushTask extends AbstractTask {
private static TaskIdEnum taskIdEnum = TaskIdEnum.TASK_TODO;
// 每隔5秒执行一次
private static final String CRON = "*/5 * * * * ?";
public TodoPushTask() {
super(taskIdEnum,CRON);
}
@Override
protected void doWork() {
//定时从数据库中扫表,封装为 定时任务对象,丢到 工厂里 让 执行器 执行
log.info("TodoPushTask,推送待办任务");
}
}
package com.zzxypm.config;
import com.zzxypm.common.support.cls.DefaultClassScanner;
import com.zzxypm.schedule.AbstractTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;
import org.springframework.scheduling.support.CronTrigger;
import java.util.ArrayList;
import java.util.List;
/**
* 动态定时任务配置
* (配置数据库动态执行)
**/
@Slf4j
@Configuration
public class DynamicScheduleConfig implements SchedulingConfigurer {
@Autowired
private DefaultClassScanner classScanner;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
try {
List<Class<?>> taskClassList = classScanner.getClassListBySuper("com.zzxypm.schedule", AbstractTask.class);
if (CollectionUtils.isNotEmpty(taskClassList)) {
List<TriggerTask> taskList = new ArrayList<>(taskClassList.size());
TriggerTask task = null;
for (Class<?> cls : taskClassList) {
AbstractTask abstractTask = (AbstractTask) cls.newInstance();
String cron = abstractTask.getCron();
task = new TriggerTask((AbstractTask) cls.newInstance(),
triggerContext -> new CronTrigger(cron).nextExecutionTime(triggerContext)
);
taskList.add(task);
}
taskRegistrar.setTriggerTasksList(taskList);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 默认的,SchedulingConfigurer 使用的也是单线程的方式,如果需要配置多线程,则需要指定 PoolSize
* @return
*/
@Bean("taskScheduler")
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(20);
taskScheduler.setThreadNamePrefix("Scheduled-");
taskScheduler.setRejectedExecutionHandler((r, e) -> {
if (!e.isShutdown()) {
r.run();
}
// 记录执行失败的任务到数据库表中
// 发送告警邮件给相关负责人
});
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
taskScheduler.setAwaitTerminationSeconds(60);
taskScheduler.initialize();
return taskScheduler;
}
}
三. 动态配置定时规则
在不重启服务的情况下,如何动态的修改定时任务的cron参数?
网上有两种方法,都有缺点。
-
https://blog.csdn.net/xht555/article/details/53121962
此方法,是在触发运行的时候,刷新定时规则,这种方法的缺点是,刷新规则的时间必须是在某次触发运行的时候。 -
https://blog.csdn.net/jianggujin/article/details/77937316
此方法基于 SchedulingConfigurer 的源码,捕获 ScheduledTaskRegistrar 类的实例,通过该类中的 TaskScheduler 实例操作定时任务的增删,而非采用 ScheduledTaskRegistrar.addTriggerTask 方法维护定时任务。所以需要自行写代码维护定时任务列表,控制任务的删减,代码的实现比较繁琐。
如果想要实现可以动态修改的定时策略,建议使用开源组件 Quartz。