利用Redis分布式锁实现Quartz分布式调度

  • Post author:
  • Post category:其他


背景

前两天研究的

利用数据库锁实现Quartz分布式调度

一文中提到几点问题,利用数据库行锁机制和唯一性约束,不仅无法解决单点问题,而且频繁访问数据库,造成db性能降低。那么最近就研究了一下redis缓存服务,通过redis的哨兵和复制功能(不知道这两个功能的,请自行百度)可以实现redis集群部署和redis分布式锁,并且数据是缓存在内存中的,所以性能要比数据库锁提高很多。

思路

1.既然是基于Redis的分布式锁,那么首先编写redis的相关配置,包括redisTemplate,cacheManager和redis分布式锁,我这里使用springboot集成redis缓存,网上有很多教程,有兴趣的可以自行研究一下。

2.配置好redis后,我使用TriggerListener接口的特性在执行任务调度时,处自动触发实现了TriggerListener接口的类,在vetoJobExecution方法总完成获取锁的操作,在triggerComplete方法中完成释放锁的操纵。

具体实现

redis相关配置

首先配置redsiTemplate,它用于直接操作redis缓存的值,过期时间等等

@Configuration
@EnableCaching
public class RedisConfig {
    
    /**
     *  定义 StringRedisTemplate ,指定序列化和反序列化的处理类
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
                Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //序列化 值时使用此序列化方法
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
    
    @Bean
    public CacheManager cacheManager(RedisTemplate<String,String> redisTemplate) {
        RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
        //使用前缀
        rcm.setUsePrefix(true);
        //缓存分割符 默认为 ":"
//        rcm.setCachePrefix(new DefaultRedisCachePrefix(":"));
        //设置缓存过期时间
        //rcm.setDefaultExpiration(60);//秒
        return rcm;
    }
}

然后配置redis的分布式锁,由于redis的特性支持分布式锁,所以这里编写获取锁(lock)和释放锁(unlock)两个方法。

/**
 * redis分布式锁
 * @author hww
 */
@Component
public class RedisDistributedLock {

	@Autowired
	StringRedisTemplate redisTemplate;
	
	/**
	 * redis锁默认超时时间
	 */
	private final long DEFAULT_TIMEOUT_LOCK = 60*1000;
	
	public boolean lock(String key) throws Exception{
		return lock(key, DEFAULT_TIMEOUT_LOCK);
	}
	
	/**
	 * 获取锁,并设置超时时间
	 * @param key
	 * @param timeout
	 * @return
	 * @throws Exception 
	 */
	public boolean lock(final String key,long timeout) throws Exception{
		if(StringUtils.isEmpty(key)){
			throw new Exception("key must be not null!");
		}
		//设置key,并返回操作结果
		Boolean execute = redisTemplate.execute(new RedisCallback<Boolean>() {
			@Override
			public Boolean doInRedis(RedisConnection connection)
					throws DataAccessException {
				//等同于redisTemplate.opsForValue().set(key, value),只不过此方法会返回一个操作结果
				return connection.setNX(key.getBytes(), key.getBytes());
			}
		});
		//如果为true,设置超时时间
		if(execute){
			System.out.println("获取到锁,设置超时时间");
			Boolean expire = redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
		}
		return execute;
	}
	
	/**
	 * 释放锁
	 * @param key
	 * @throws Exception
	 */
	public void unlock(String key) throws Exception{
		if(StringUtils.isEmpty(key)){
			throw new Exception("key must be not null!");
		}
		redisTemplate.delete(key);
		System.out.println("释放锁");
	}
	
}

编写TriggerListener实现类

TriggerListener的javaDoc介绍:

The interface to be implemented by classes that want to be informed when a


Trigger


fires. In general, applications that use a

Scheduler

will not have use for this mechanism

意思是说实现了该接口的实现类将会在绑定的trigger被触发时通知其执行相关动作。

public class RedisDistributedTriggerListener implements TriggerListener{

	RedisDistributedLock redisDistributedLock = SpringUtils.getBean(RedisDistributedLock.class);
	
	@Override
	public String getName() {
		return "RedisDistributedTriggerListener";
	}

	@Override
	public void triggerFired(Trigger trigger, JobExecutionContext context) {
	}

	/**
	 * 当一个scheduler被调起时,将会唤起vetoJobExecution方法
	 * 返回true,则取消任务执行,否则继续执行
	 * @author hww
	 */
	public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
		try {
			String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
			//获取到锁,true代表获取到锁,返回false放行, false代表获取锁失败,返回true停止执行
			return !redisDistributedLock.lock(lockKey);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public void triggerMisfired(Trigger trigger) {
	}

	/**
	 * 在任务调度执行完成时,调用此方法
	 */
	@Override
	public void triggerComplete(Trigger trigger, JobExecutionContext context,
			CompletedExecutionInstruction triggerInstructionCode) {
		String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
		try {
			redisDistributedLock.unlock(lockKey);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

在这个类中有几点需要注意:

1.getName方法的返回值需要自己定义,默认为null,否则会抛出异常。

2.vetoJobExecution方法在任务调起时执行,当返回false时,继续执行任务,否则停止任务。

3.在vetoJobExecution方法中获取到锁,则继续执行任务,没有获取到则停止。

4.triggerComplete方法在在任务调度执行完成时,调用此方法来释放锁。

TriggerListener绑定Scheduler



SpringBoot集成Quartz,动态创建,更新,暂停,唤醒,删除任务调度

一文中实现了动态创建Job。

@SuppressWarnings("unchecked")
	public void addTimerJob(SchedulerJob job) {
		try {
			JobDetail jobDetail = JobBuilder
					.newJob((Class<? extends Job>) Class.forName(job.getClassname()))
					// 指定执行类
					.withIdentity(job.getJobname(), job.getJobgroup())
					// 指定name和group
					.requestRecovery().withDescription(job.getDescription())
					.build();
			// 创建表达式调度构建器
			CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
					.cronSchedule(job.getCronexpression());
			// 创建触发器
			CronTrigger cronTrigger = TriggerBuilder.newTrigger()
					.withIdentity(job.getTriggername(), job.getTriggergroup())
					.withSchedule(cronScheduleBuilder).build();
			//注册triggerListener
			scheduler.getListenerManager().addTriggerListener(new RedisDistributedTriggerListener());
			scheduler.scheduleJob(jobDetail, cronTrigger);
			scheduler.start();// 触发器并不会立刻触发
			System.out
					.println("==================================创建Job成功!==================================");
		} catch (ClassNotFoundException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} catch (SchedulerException e) {
			e.printStackTrace();
		}
	}

总结

至此就完成了实现Quartz分布式调度的所有配置,当执行任意的Job任务时,都会触发注册到Scheduler上的TriggerListener的实现类,并在vetoJobExecution方法中获取到锁,此时只能有一个线程获取该锁,并继续执行,并在执行完毕后,在triggerComplete方法中释放锁。



版权声明:本文为ZixiangLi原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。