SpringBoot下quertz集群搭建(加载多个trigger)

  • Post author:
  • Post category:其他


SpringBoot下quertz集群搭建(加载多个trigger)

网上查阅了许多资料,看见有不少quertz搭建的demo,但是少有SpringBoot下quertz执行多个trigger的文章,

在使用过程中发现

如果执行的job类有异常未处理的话会导致整个定时任务停止,目前尚未找到原因,我的解决方式是try..cach处理掉所有异常….

亲测可用,如有不足..还希望各位大大指点

本人新萌菜鸟一枚…查看了部分源码后编写如下…

1、创建quertzConfig类,将quertz集群控制交由spring管理

@Configuration
public class QueryConfig{
    
    @Autowried
    private DataSource dataSource;
    //相关基本信息配置
    private Properties quartzProperties() throws IOException {
        Properties prop = new Properties();
        prop.put("quartz.scheduler.instanceName", "quartz_1");//实例名集群中每个应用实例名需不一致
        prop.put("org.quartz.scheduler.instanceId", "AUTO");
        prop.put("org.quartz.scheduler.skipUpdateCheck", "true");
        prop.put("org.quartz.scheduler.jmx.export", "true");


        //数据源配置如果不用单独的数据库配置集群 则可以不用配置数据源信息
        prop.put("org.quartz.dataSource.myDS.driver", myDSDriver);
        prop.put("org.quartz.dataSource.myDS.URL", myDSURL);
        prop.put("org.quartz.dataSource.myDS.user", myDSUser);
        prop.put("org.quartz.dataSource.myDS.password", myDSPassword);
        prop.put("org.quartz.dataSource.myDS.maxConnections", 30);         


        prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
        prop.put("org.quartz.jobStore.dataSource", dataSource);//使用现有数据源(如果不用则需要填写上面配置的数据源 例如:myDS)
        prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");//集群所使用表名字开头
        prop.put("org.quartz.jobStore.isClustered", "true");//是否开启集群
         
        prop.put("org.quartz.jobStore.clusterCheckinInterval", "20000");
        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        prop.put("org.quartz.jobStore.misfireThreshold", "120000");
        prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
        prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE");
         
        prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        prop.put("org.quartz.threadPool.threadCount", "10");
        prop.put("org.quartz.threadPool.threadPriority", "5");
        prop.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true");
         
        prop.put("org.quartz.plugin.triggHistory.class", "org.quartz.plugins.history.LoggingJobHistoryPlugin");
        prop.put("org.quartz.plugin.shutdownhook.class", "org.quartz.plugins.management.ShutdownHookPlugin");
        prop.put("org.quartz.plugin.shutdownhook.cleanShutdown", "true");
        return prop;
    }

    @Bean 
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException { 
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        //用于quartz集群,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 
        schedulerFactoryBean.setOverwriteExistingJobs(true); 
        //用于quartz集群,加载quartz数据源 直接注入便可此处省略
        schedulerFactoryBean.setDataSource(dataSource);   
        //QuartzScheduler 延时启动,应用启动完10秒后 QuartzScheduler 再启动 
        schedulerFactoryBean.setStartupDelay(10);
        schedulerFactoryBean.setQuartzProperties(quartzProperties());//加载配置文件
        schedulerFactoryBean.setAutoStartup(true);
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        /**
        *注册触发器 此处是关键 setTriggers(可传入Trigger数组)
        */
        Trigger[] triggerArray = createTriggerArray();
        factory.setTriggers(triggerArray);
        
        return factory; 
    }

    /**
     * 创建Trigger[]
     * 
     */
    private Trigger[] createTriggerArray() { 
    
        //此处可注入类通过数据库或者配置文件获取相关trigger信息
        List<Entity> entityList = new ArrayList<>();//demo 模拟查出来的List
        
        int count = entityList.size();
        Trigger[] triggerArray = new Trigger[count];
        for(int i = 0; i < count; i++){
            Entity ent = eneityList.get(i);
            String jobClassName = ent.getJobClassName();    //获取要执行的类名
            String method = ent.getMethod();                //获取要执行的方法
            String cronex = ent.getCronex();                //获取执行时间 即cronex表达式
            String groupName = ent.getGroupname();            //获取分组名
            
            //构造 JobDetail
            JobDetail jobDetail = getJobDetail(jobClassName,method,groupName);
            //构造 Trigger
            Trigger trigger = getTrigger(jobDetail,jobClassName,groupName,cronex);
            triggerArray[i]=trigger;
        }
        return triggerArray; 
    } 
    
    /**
     * 创建Trigger
     * 
     */
    private Trigger getTrigger(JobDetail jobDetail,String jobClassName,String groupName,String cronex) { 
        CronScheduleBuilder builder = new CronScheduleBuilder(cronex);//传入cronex表达式 如果语法错误则抛出异常
        Map<String,Object> jobDetaMap = new HashMap<>();
        jobDetaMap.put("jobDetail",jobDetail);
        
        return TriggerBuilder.newTrigger()
                .withIdentity(jobClassName, groupName)//设置分组
                .forJob(jobDetail)//设置jobDetail
                .usingJobData(new JobDataMap(jobDetaMap))//设置jobDetail
                .withScheduler(builder)//设置cronex表达式
                .build(); //生成Trigger
    }
    
    /**
     * 创建JobDetail
     * 
     */
    private JobDetail getJobDetail(String jobClassName,String method,String groupName) { 
        JobDetailImps jobDetail = new JobDetailImps();
        
        Map<String, String> map = new HashMap<>();
        map.put("targetObject", jobClassName);
        map.put("targetMethod", method);
        JobDataMap jobDataMap = new JobDataMap(map);//指定需要执行的类和方法
        jobDetail.setJobClass(InvokingJobDetailDetailFactory.class);//指定一个类 获取传入的方法名与类名,通过反射调用
        jobDetail.setName(jobClassName);//设置名称
        jobDetail.setGroup(groupName);//设置分组名
        jobDetail.setDurability(true); 
        jobDetail.setRequestsRecovery(true);
        return factoryBean; 
    } 

}

2、配置反射实现类


@Component
@PersistJobDataAfterExecution    
@DisallowConcurrentExecution //注解声明 当前类执行未结束时不允许重复调用
public class InvokingJobDetailDetailFactory extends Job{
 
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        try {
            String targetObject = String.valueOf(context.jobDetail().getJobDataMap.get("targetObject"));//获取需要执行的类名
            String targetMethod = String.valueOf(context.jobDetail().getJobDataMap.get("targetMethod"));//获取要执行的方法名
            
            Object obj = ApplicationContextProvider.getBean(targetObject);//springBoot 工具类通过bean名获取bean实例
            
            Method m = null; 
            try { 
                m = obj.getClass().getMethod(targetMethod); 
                m.invoke(obj); 
            } catch (SecurityException e) { 
                e.printStackTrace(); 
            } catch (NoSuchMethodException e) { 
                e.printStackTrace(); 
            } 
        } catch (Exception e) { 
            throw new JobExecutionException(e); 
        }
    }
}

3、springBoot 工具类


@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    /**
     * 上下文对象实例
     */
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     * @return
     */
    public ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     * @param name
     * @return
     */
    public Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}

转载于:https://my.oschina.net/u/3669812/blog/1586302