基于SchedulingConfigurer的任务调度

  • Post author:
  • Post category:其他



一. SchedulingConfigurer解析


Spring 中,创建定时任务除了使用@Scheduled 注解外,还可以使用 SchedulingConfigurer。既然两者都可以实现定时任务,那有什么不同呢?

@Schedule注解的一个缺点就是其定时时间不能动态更改,它适用于具有固定任务周期的任务,若要修改任务执行周期,只能走“停服务→修改任务执行周期→重启服务”这条路。而基于 SchedulingConfigurer 接口方式可以做到。SchedulingConfigurer 接口可以实现在@Configuration 类上,同时不要忘了,还需要@EnableScheduling 注解的支持。

package org.springframework.scheduling.annotation;

import org.springframework.scheduling.config.ScheduledTaskRegistrar;

@FunctionalInterface
public interface SchedulingConfigurer {
    void configureTasks(ScheduledTaskRegistrar var1);
}

ScheduledTaskRegistrar类包括以下几个重要方法:

在这里插入图片描述

从方法的命名上可以猜到,方法包含定时任务,延时任务,基于 Cron 表达式的任务,以及 Trigger 触发的任务。


二. 代码示例

create table if not exists sys_schedule_lock(
    task_id varchar(32) primary key not null comment '任务类型id',
    task_desc varchar(32) default null comment '任务类型说明',
    exec_ip varchar(32) default null comment '获取锁的机器ip',
    acquire_time datetime DEFAULT NULL comment '获取锁的时间',
    status int(11) comment '当前任务id锁状态: 0-空闲 1-占用'
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务执行数据库锁';

INSERT INTO zzxypm.sys_schedule_lock (task_id, task_desc, exec_ip, acquire_time, status) VALUES ('TASK_TODO', '定时推送待办任务', '127.0.0.1', null, 0);
@NoArgsConstructor
@Data
@TableName("sys_schedule_lock")
public class ScheduleLock {
    @TableId(value = "task_id")
    private String taskId;
    private String taskDesc;
    private Date acquireTime;
    private String execIp;
    private int status;
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zzxypm.entity.ScheduleLock;

import java.util.Map;

public interface ScheduleLockMapper extends BaseMapper<ScheduleLock> {
    int lock(Map<String,Object> map);
    int unlock(Map<String,Object> map);
    int batchResetStatus();
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zzxypm.mapper.ScheduleLockMapper">

   <update id="lock" parameterType="map">
       update sys_schedule_lock
       set exec_ip = #{execIp},
           acquire_time = now(),
           status = 1
       where task_id = #{taskId}
       and status = 0;
   </update>

    <update id="unlock" parameterType="map">
       update sys_schedule_lock
       set
           acquire_time = now(),
           status = 0
       where task_id = #{taskId}
       and status = 1
       and exec_ip = #{execIp};
   </update>

   <update id="batchResetStatus">
           update sys_schedule_lock
           set status = 0
           where status = 1
   </update>
</mapper>
package com.zzxypm.schedule;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public enum TaskIdEnum {
    TASK_TODO("TASK_TODO", "定时推送待办任务");

    TaskIdEnum(String taskId, String desc) {
        this.taskId = taskId;
        this.desc = desc;
    }

    /**
     * 任务唯一id
     */
    private String taskId;

    /**
     * 任务类型描述
     */
    private String desc;


    public String getTaskId() {
        return taskId;
    }

    public String getDesc() {
        return desc;
    }

    public static TaskIdEnum acquire(final String taskId) {
        Optional<TaskIdEnum> serializeEnum =
                Arrays.stream(TaskIdEnum.values())
                        .filter(v -> Objects.equals(v.getTaskId(), taskId))
                        .findFirst();
        return serializeEnum.orElse(TaskIdEnum.TASK_TODO);
    }

    public static List<TaskIdEnum> getEnumList() {
        return Arrays.asList(TaskIdEnum.values());
    }
}
package com.zzxypm.schedule;

import com.zzxypm.common.util.DateUtil;
import com.zzxypm.common.util.IPV4Util;
import com.zzxypm.common.util.SpringContextHolder;
import com.zzxypm.mapper.ScheduleLockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;

import java.util.HashMap;
import java.util.Map;

/**
 * 抽象任务骨架定义
 */
@Slf4j
public abstract class AbstractTask implements Runnable{

    private TaskIdEnum taskIdEnum;
    private String cron;
    
    private ScheduleLockMapper lockMapper = SpringContextHolder.getBean(ScheduleLockMapper.class);

    public AbstractTask(TaskIdEnum taskIdEnum,String cron) {
        this.taskIdEnum = taskIdEnum;
        this.cron = cron;
    }

    @Transactional
    @Override
    public void run() {
        String taskId = taskIdEnum.getTaskId();
        String ip = IPV4Util.getLocalIpv4Address();
        Map<String,Object> map = new HashMap<>();
        map.put("taskId",taskId);
        map.put("execIp",ip);
        int lock = lockMapper.lock(map);
        log.info("lock: taskId=[{}],ip=[{}],lock=[{}]", taskId, ip, lock);
        if (lock == 1) {
            log.info("taskId:[{}],任务执行开始时间:[{}]", taskId, DateUtil.now());
            doWork();
            log.info("taskId:[{}],任务执行结束时间:[{}]", taskId, DateUtil.now());

            int unlock = lockMapper.unlock(map);
            log.info("unlock: taskId=[{}],ip=[{}],unlock=[{}]", taskId, ip, unlock);
        }
    }

    protected abstract void doWork();

    public TaskIdEnum getTaskIdEnum() {
        return taskIdEnum;
    }

    public String getCron() {
        return cron;
    }
}
package com.zzxypm.schedule.task;

import com.zzxypm.schedule.AbstractTask;
import com.zzxypm.schedule.TaskIdEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 待办推送任务
 */
@Slf4j
public class TodoPushTask extends AbstractTask {

    private static TaskIdEnum taskIdEnum = TaskIdEnum.TASK_TODO;

    // 每隔5秒执行一次
    private static final String CRON = "*/5 * * * * ?";

    public TodoPushTask() {
        super(taskIdEnum,CRON);
    }

    @Override
    protected void doWork() {
        //定时从数据库中扫表,封装为 定时任务对象,丢到 工厂里 让 执行器 执行
        log.info("TodoPushTask,推送待办任务");
    }
}
 package com.zzxypm.config;

import com.zzxypm.common.support.cls.DefaultClassScanner;
import com.zzxypm.schedule.AbstractTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;
import org.springframework.scheduling.support.CronTrigger;

import java.util.ArrayList;
import java.util.List;

/**
 * 动态定时任务配置
 * (配置数据库动态执行)
 **/
@Slf4j
@Configuration
public class DynamicScheduleConfig implements SchedulingConfigurer {

    @Autowired
    private DefaultClassScanner classScanner;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        try {
            List<Class<?>> taskClassList = classScanner.getClassListBySuper("com.zzxypm.schedule", AbstractTask.class);
            if (CollectionUtils.isNotEmpty(taskClassList)) {
                List<TriggerTask> taskList = new ArrayList<>(taskClassList.size());
                TriggerTask task = null;
                for (Class<?> cls : taskClassList) {
                    AbstractTask abstractTask = (AbstractTask) cls.newInstance();
                    String cron = abstractTask.getCron();
                    task = new TriggerTask((AbstractTask) cls.newInstance(),
                            triggerContext -> new CronTrigger(cron).nextExecutionTime(triggerContext)
                    );
                    taskList.add(task);
                }
                taskRegistrar.setTriggerTasksList(taskList);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 默认的,SchedulingConfigurer 使用的也是单线程的方式,如果需要配置多线程,则需要指定 PoolSize
     * @return
     */
    @Bean("taskScheduler")
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(20);
        taskScheduler.setThreadNamePrefix("Scheduled-");
        taskScheduler.setRejectedExecutionHandler((r, e) -> {
            if (!e.isShutdown()) {
                r.run();
            }
            // 记录执行失败的任务到数据库表中
            // 发送告警邮件给相关负责人
        });
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        taskScheduler.setAwaitTerminationSeconds(60);
        taskScheduler.initialize();
        return taskScheduler;
    }
}


三. 动态配置定时规则


在不重启服务的情况下,如何动态的修改定时任务的cron参数?

网上有两种方法,都有缺点。


  1. https://blog.csdn.net/xht555/article/details/53121962


    此方法,是在触发运行的时候,刷新定时规则,这种方法的缺点是,刷新规则的时间必须是在某次触发运行的时候。


  2. https://blog.csdn.net/jianggujin/article/details/77937316


    此方法基于 SchedulingConfigurer 的源码,捕获 ScheduledTaskRegistrar 类的实例,通过该类中的 TaskScheduler 实例操作定时任务的增删,而非采用 ScheduledTaskRegistrar.addTriggerTask 方法维护定时任务。所以需要自行写代码维护定时任务列表,控制任务的删减,代码的实现比较繁琐。

如果想要实现可以动态修改的定时策略,建议使用开源组件 Quartz。



版权声明:本文为G0_hw原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。