@Component
@Slf4j
public class DelayUtils implements Runnable, InitializingBean {
public static DelayUtils delayUtils = null;
private static DelayQueue delayQueue = new DelayQueue();
private static AtomicBoolean isrun = new AtomicBoolean(true);
/**
* 初始化 redis hash key
*/
private static final List<String> delayKeys = new ArrayList<String>() {{
}};
@Autowired
private RedisTemplate redisTemplate;
public DelayUtils(RedisTemplate redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
/**
* 加入延时队列
* @param object
* @param time
*/
public void setDelay(String delayKey, Map<String, Object> object, long time) {
try {
String json = JSONObject.toJSONString(object);
DelayedTask delayedTask = new DelayedTask(json, time);
//redis持久化,防止数据丢失
Map map = redisTemplate.opsForHash().entries(delayKey);
map = map == null ? new HashMap() : map;
map.put(Integer.valueOf(object.get("id").toString()), delayedTask);
redisTemplate.opsForHash().putAll(delayKey, map);
delayQueue.offer(delayedTask);
} catch (NumberFormatException e) {
log.error("队列添加数据异常{}", e);
}
}
/**
* 从队列删除
* @param id
*/
public void deleteForDelay(String delayKey, Integer id) {
try {
redisTemplate.opsForHash().delete(delayKey, id);
if(isrun.get()) {
Object[] objects = delayQueue.toArray();
if(objects != null && objects.length > 0) {
for (Object object : objects) {
DelayedTask delayedTask = (DelayedTask) object;
Map set = JSONObject.parseObject(delayedTask.getTaskJson(), Map.class);
if(set != null && Integer.valueOf(set.get("id").toString()).equals(id))
delayQueue.remove(delayedTask);
}
}
}
} catch (Exception e) {
log.error("队列删除数据异常{}", e);
}
}
/**
* 延迟队列线程方法
*/
@Override
public void run() {
try {
while (isrun.get()) {
Delayed take = delayQueue.take();
if(take != null) {
DelayedTask delayedTask = (DelayedTask) take;
String taskJson = delayedTask.getTaskJson();
Map map = JSONObject.parseObject(taskJson, Map.class);
if(map != null) {
//业务逻辑
}
}
}
}catch (Exception e) {
log.error("延时队列执行异常 {}", e);
}
}
/**
* 初始化数据
* @throws Exception
*/
@Override
public void afterPropertiesSet() {
if(delayUtils == null) {
delayUtils = new DelayUtils(redisTemplate);
}
try {
Thread thread = new Thread(delayUtils);
thread.start();
//启动项目,将redis缓存加载到内存
for (String delayKey : delayKeys) {
Map<String, Object> map = redisTemplate.opsForHash().entries(delayKey);
if(map != null && map.size() > 0) {
for (String key : map.keySet()) {
DelayedTask delayedTask = (DelayedTask) map.get(key);
delayQueue.offer(delayedTask);
}
}
}
} catch (Exception e) {
log.error("初始化队列数据异常 {}", e);
}
}
}
public class DelayedTask implements Delayed, Serializable {
private String taskJson ;
private long start = System.currentTimeMillis();//开始时间
private long time ;
public DelayedTask(String taskJson, long time) {
super();
this.taskJson = taskJson;
this.time = time;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
@Override
public String toString() {
return "OrderDelayedTask [orderJson=" + taskJson + ", start=" + start + ", time=" + time + "]";
}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
DelayedTask o1 = (DelayedTask) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
版权声明:本文为weixin_44180753原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。