springBoot + Quartz

  • Post author:
  • Post category:其他


1. Quartz 概述

Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者可以到

下载

Quartz 的发布版本及其源代码。

2. Quartz特点

作为一个优秀的开源调度框架,Quartz 具有以下特点:

  1. 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
  3. 分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。
  4. 作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。

quartz调度核心元素:

  • Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
  • Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。
  • Calendar:它是一些日历特定时间点的集合。一个trigger可以包含多个Calendar,以便排除或包含某些时间点。
  • JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
  • Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的jobDetail中配置concurrent参数。

3. 集群配置

quartz集群是通过数据库表来感知其他的应用的,各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。

  • 数据库表:以前有12张表,现在只有11张表,现在没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表。
  • 建表SQL:

    下载

    ,不同数据库有不同的表。

表名

说明

备注

QRTZ_CALENDARS

以 Blob 类型存储 Quartz 的 Calendar 信息

QRTZ_CRON_TRIGGERS

存储 Cron Trigger,包括 Cron 表达式和时区信息

QRTZ_FIRED_TRIGGERS

存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息

QRTZ_PAUSED_TRIGGER_GRPS

存储已暂停的 Trigger 组的信息

QRTZ_SCHEDULER_STATE

存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)

QRTZ_LOCKS

存储程序的悲观锁的信息(假如使用了悲观锁)

QRTZ_JOB_DETAILS

存储每一个已配置的 Job 的详细信息

QRTZ_JOB_LISTENERS

存储有关已配置的 JobListener 的信息

QRTZ_TRIGGER_LISTENERS

存储已配置的 TriggerListener 的信息

QRTZ_BLOG_TRIGGERS

Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)

QRTZ_TRIGGERS

存储已配置的 Trigger 的信息

QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁:

  • CALENDAR_ACCESS
  • JOB_ACCESS
  • MISFIRE_ACCESS
  • STATE_ACCESS
  • TRIGGER_ACCESS

4. 创建项目

4.1

下载项目

,运行看结果和流程;

4.2 Maven依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--quartz-->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!--因为quartz 需要有Spring context 所有引入mail包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
        </dependency>

4.2 QuartzConfig

import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

/**
 * @ClassName: QuartzConfig
 * @Description: 初始化类Quartz环境初始化
 * @Version: 1.0
 * @Author: Thomas
 * @Date: 2021/4/30 22:49
 */
@Configuration
public class QuartzConfig {
    private static final Logger log = LoggerFactory.getLogger(QuartzConfig.class);

    /**
     * 1.通过name+group获取唯一的jobKey;
     * 2.通过groupname来获取其下的所有jobkey
     */
    private final static String GROUP_NAME = "QuartzJobGroups";

    /**
     * 数据源
     */
    @Autowired
    private DataSource dataSource;

    /**
     * @Description: 创建调度器,可以省略的
     * @Param: []
     * @return: org.quartz.Scheduler
     * @Author: Thomas
     * @Date: 2021/4/30 22:58
     */
    @Bean
    public Scheduler scheduler() throws Exception {
        log.info("执行:scheduler()");

        Scheduler scheduler = schedulerFactoryBean().getScheduler();
        scheduler.start();
        return scheduler;
    }

    /**
     * @Description: 创建调度器工厂bean对象
     * @Param: []
     * @return: org.springframework.scheduling.quartz.SchedulerFactoryBean
     * @Author: Thomas
     * @Date: 2021/4/30 22:58
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        log.info("执行:schedulerFactoryBean()");

        SchedulerFactoryBean factory = new SchedulerFactoryBean();

        factory.setSchedulerName("Cluster_Scheduler");
        factory.setDataSource(dataSource);
        factory.setApplicationContextSchedulerContextKey("applicationContext");
        // 设置调度器中的线程池。
        factory.setTaskExecutor(schedulerThreadPool());

        // 设置触发器,可以多个
        Trigger trigger1 = trigger1().getObject();
        Trigger trigger2 = trigger2().getObject();
        Trigger[] triggers = {trigger1, trigger2};
        factory.setTriggers(triggers);

        // 设置quartz的配置信息
        factory.setQuartzProperties(quartzProperties());
        return factory;
    }

    /**
     * @Description: 创建一个调度器的线程池
     * @Param: []
     * @return: java.util.concurrent.Executor
     * @Author: Thomas
     * @Date: 2021/4/30 22:58
     */
    @Bean
    public Executor schedulerThreadPool() {
        log.info("执行:schedulerThreadPool()");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(15);
        executor.setMaxPoolSize(25);
        executor.setQueueCapacity(100);

        return executor;
    }

    /**
     * @Description: 读取quartz.properties配置文件
     * @Param: []
     * @return: java.util.Properties
     * @Author: Thomas
     * @Date: 2021/4/30 22:59
     */
    @Bean
    public Properties quartzProperties() throws IOException {
        log.info("执行:quartzProperties()");

        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("config/quartz.properties"));

        // 在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /**
     * @Description: 创建触发器工厂
     * @Param: [jobDetail:任务详情, cronExpression:Cron表达式]
     * @return: org.springframework.scheduling.quartz.CronTriggerFactoryBean
     * @Author: Thomas
     * @Date: 2021/5/1 15:15
     */
    private static CronTriggerFactoryBean createCronTriggerFactory(JobDetail jobDetail, String cronExpression) {
        CronTriggerFactoryBean factoryBean = new CronTriggerFactoryBean();
        factoryBean.setJobDetail(jobDetail);
        factoryBean.setCronExpression(cronExpression);
        return factoryBean;
    }

    /**
     * @Description: 创建JobDetailFactory
     * @Param: [jobClass, groupName:组名称, targetObject:具体的业务类]
     * @return: org.springframework.scheduling.quartz.JobDetailFactoryBean
     * @Author: Thomas
     * @Date: 2021/5/1 15:19
     */
    private static JobDetailFactoryBean createJobDetailFactory(Class<? extends Job> jobClass,
                                                               String groupName,
                                                               String targetObject) {
        log.info("执行:createJobDetailFactory()");

        JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
        factoryBean.setJobClass(jobClass);
        // 是否持久化job内容
        factoryBean.setDurability(true);
        // 设置是否多次请求尝试任务。
        factoryBean.setRequestsRecovery(true);
        factoryBean.setGroup(groupName);

        Map<String, String> map = new HashMap<>(2);
        // 指定具体的业务类
        map.put("targetObject", targetObject);
        // 指定具体的业务类方法
        map.put("targetMethod", "execute");
        factoryBean.setJobDataAsMap(map);

        return factoryBean;
    }

    //=========下面配置自定义的trigger和jobdetail===================

    /**
     * @Description: 创建triggerFactoryBean对象
     * @Param: []
     * @return: org.springframework.scheduling.quartz.CronTriggerFactoryBean
     * @Author: Thomas
     * @Date: 2021/4/30 23:00
     */
    @Bean
    public CronTriggerFactoryBean trigger1() {
        log.info("执行:trigger1()");

        // JobDetail任务详情
        JobDetail jobDetail = createJobDetail1().getObject();
        // Cron表达式
        String cronExpression = "0/2 * * * * ?";

        return createCronTriggerFactory(jobDetail, cronExpression);
    }


    /**
     * @Description: 创建JobDetail对象
     * @Param: []
     * @return: org.springframework.scheduling.quartz.JobDetailFactoryBean
     * @Author: Thomas
     * @Date: 2021/4/30 23:00
     */
    @Bean
    public JobDetailFactoryBean createJobDetail1() {
        log.info("执行:createJobDetail1()");
        return createJobDetailFactory(InvokingJobDetailFactory.class, GROUP_NAME, "executeJob1");
    }

    @Bean
    public CronTriggerFactoryBean trigger2() {
        log.info("执行:trigger2()");

        // JobDetail任务详情
        JobDetail jobDetail = createJobDetail2().getObject();
        // Cron表达式
        String cronExpression = "0/10 * * * * ?";

        return createCronTriggerFactory(jobDetail, cronExpression);
    }

    @Bean
    public JobDetailFactoryBean createJobDetail2() {
        log.info("执行:createJobDetail2()");
        return createJobDetailFactory(InvokingJobDetailFactory.class, GROUP_NAME, "executeJob2");
    }


}

4.3 InvokingJobDetailFactory

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.lang.reflect.Method;


/**
 * @ClassName: InvokingJobDetailFactory
 * @Description: 任务同一调度(反射原理)
 * @Version: 1.0
 * @Author: Thomas
 * @Date: 2021/5/1 14:52
 */
public class InvokingJobDetailFactory extends QuartzJobBean {
    private static final Logger log = LoggerFactory.getLogger(InvokingJobDetailFactory.class);

    /**
     * 计划任务所在类
     */
    private String targetObject;

    /**
     * 具体需要执行的计划任务
     */
    private String targetMethod;

    private ApplicationContext ctx;

    /**
     * @Description: 反射原理调度需要执行的类、方法
     * @Param: [context]
     * @return: void
     * @Author: Thomas
     * @Date: 2021/5/1 14:54
     */
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        log.info("执行executeInternal(JobExecutionContext context)");
        try {
            Object obj = ctx.getBean(targetObject);
            Method method = null;
            try {
                method = obj.getClass().getMethod(targetMethod);
                log.info("执行method:" + method);
                //调用被代理对象的方法
                method.invoke(obj);
                log.info("obj:" + obj);
            } catch (SecurityException e) {
                e.printStackTrace();
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.ctx = applicationContext;
    }

    public void setTargetObject(String targetObject) {
        this.targetObject = targetObject;
    }

    public void setTargetMethod(String targetMethod) {
        this.targetMethod = targetMethod;
    }
}

4.3 ExecuteJob1

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @ClassName: ExecuteJob1
 * @Description: 自定义需要执行的任务1
 * @Version: 1.0
 * @Author: Thomas
 * @Date: 2021/5/1 14:55
 */
@Service
public class ExecuteJob1 {
    private static final Logger log = LoggerFactory.getLogger(ExecuteJob1.class);

    /**
     * @Description: 执行具体的任务逻辑(方法名在quartz定义)
     * @Param: []
     * @return: void
     * @Author: Thomas
     * @Date: 2021/5/1 14:55
     */
    public void execute() {
        log.info("ExecuteJob1定时任务执行了。。。。。" + new Date());

    }
}

4.4 ExecuteJob12

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @ClassName: ExecuteJob2
 * @Description: 自定义需要执行的任务2
 * @Version: 1.0
 * @Author: Thomas
 * @Date: 2021/5/1 14:55
 */
@Service
public class ExecuteJob2 {
    private static final Logger log = LoggerFactory.getLogger(ExecuteJob2.class);

    /**
     * @Description: 执行具体的任务逻辑(方法名在quartz定义)
     * @Param: []
     * @return: void
     * @Author: Thomas
     * @Date: 2021/5/1 14:55
     */
    public void execute() {
        log.info("ExecuteJob2定时任务执行了。。。。。" + new Date());
    }
}

4.5 application.properties

## tomcat配置
server.port=8090
#server.tomcat.maxHttpHeaderSize=8192
server.tomcat.uri-encoding=UTF-8
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
spring.messages.encoding=UTF-8
# tomcat最大线程数,默认为200
server.tomcat.max-threads=800
# session最大超时时间(分钟),默认为30
server.session-timeout=60

## spring 配置
spring.application.name=springboot-Quartz
application.main=com.abel.quartz.Application


## 主数据源,默认的
spring.datasource.url=jdbc:mysql://192.168.10.101:3306/quartz?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driverClassName=com.mysql.jdbc.Driver

## 连接池配置
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
#最小空闲连接
spring.datasource.hikari.minimum-idle=10
#连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count)
spring.datasource.hikari.maximum-pool-size=30
#spring.datasource.hikari.auto-commit=true
#一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟。minimumIdle<maximumPoolSize时生效
spring.datasource.hikari.idle-timeout=120000
#自定义连接池名
#spring.datasource.hikari.pool-name=DatebookHikariCP
#一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL wait_timeout参数(show variables like '%timeout%';)
spring.datasource.hikari.max-lifetime=1800000
#等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒
spring.datasource.hikari.connection-timeout=30000
#指定验证连接有效性的超时时间,默认是5秒
spring.datasource.hikari.validation-timeout=3000
spring.datasource.hikari.connection-test-query=SELECT 1

# 调度标识名 集群中每一个实例都必须使用相同的名称
quartz.scheduler.instanceName=QuartScheduler
# 允许最大连接
org.quartz.dataSource.myDS.maxConnections=10

记得修改数据库:地址、账号、密码。

4.6 quartz.properties

# 是否使用properties作为数据存储
org.quartz.jobStore.useProperties=false
# 数据库中的表格命名前缀
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否是一个集群,是不是分布式的任务
org.quartz.jobStore.isClustered=true
# 集群检查周期,单位毫秒。可以自定义缩短时间。 当某一个节点宕机的时候,其他节点等待多久后开始执行任务。
org.quartz.jobStore.clusterCheckinInterval=5000
# 单位毫秒, 集群中的节点退出后,再次检查进入的时间间隔。
org.quartz.jobStore.misfireThreshold=60000
# 事务隔离级别
org.quartz.jobStore.txIsolationLevelReadCommitted=true
# 存储的事务管理类型
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# 使用的Delegate类型
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 集群的命名,一个集群要有相同的命名。
org.quartz.scheduler.instanceName=ClusterQuartz
# 节点的命名,可以自定义。 AUTO代表自动生成。
org.quartz.scheduler.instanceId=AUTO
# rmi远程协议是否发布
org.quartz.scheduler.rmi.export=false
# rmi远程协议代理是否创建
org.quartz.scheduler.rmi.proxy=false
# 是否使用用户控制的事务环境触发执行job。
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false



版权声明:本文为weixin_40461030原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。