JAVA实现延时队列

  • Post author:
  • Post category:java


@Component
@Slf4j
public class DelayUtils implements Runnable, InitializingBean {

    public static DelayUtils delayUtils = null;

    private static DelayQueue delayQueue = new DelayQueue();

    private static AtomicBoolean isrun = new AtomicBoolean(true);

    /**
     * 初始化 redis hash key
     */
    private static final List<String> delayKeys = new ArrayList<String>() {{
        
    }};

    @Autowired
    private RedisTemplate redisTemplate;

    public DelayUtils(RedisTemplate redisTemplate) {
        super();
        this.redisTemplate = redisTemplate;
    }

    /**
     * 加入延时队列
     * @param object
     * @param time
     */
    public void setDelay(String delayKey, Map<String, Object> object, long time) {
        try {
            String json = JSONObject.toJSONString(object);
            DelayedTask delayedTask = new DelayedTask(json, time);
            //redis持久化,防止数据丢失
            Map map = redisTemplate.opsForHash().entries(delayKey);
            map = map == null ? new HashMap() : map;
            map.put(Integer.valueOf(object.get("id").toString()), delayedTask);
            redisTemplate.opsForHash().putAll(delayKey, map);
            delayQueue.offer(delayedTask);
        } catch (NumberFormatException e) {
            log.error("队列添加数据异常{}", e);
        }
    }

    /**
     * 从队列删除
     * @param id
     */
    public void deleteForDelay(String delayKey, Integer id) {
        try {
            redisTemplate.opsForHash().delete(delayKey, id);
            if(isrun.get()) {
                Object[] objects = delayQueue.toArray();
                if(objects != null && objects.length > 0) {
                    for (Object object : objects) {
                        DelayedTask delayedTask = (DelayedTask) object;
                        Map set = JSONObject.parseObject(delayedTask.getTaskJson(), Map.class);
                        if(set != null && Integer.valueOf(set.get("id").toString()).equals(id))
                            delayQueue.remove(delayedTask);
                    }
                }
            }
        } catch (Exception e) {
            log.error("队列删除数据异常{}", e);
        }
    }

    /**
     * 延迟队列线程方法
     */
    @Override
    public void run() {
        try {
            while (isrun.get()) {
                Delayed take = delayQueue.take();
                if(take != null) {
                    DelayedTask delayedTask = (DelayedTask) take;
                    String taskJson = delayedTask.getTaskJson();
                    Map map = JSONObject.parseObject(taskJson, Map.class);
                    if(map != null) {
                        //业务逻辑
                        
                    }
                }
            }
        }catch (Exception e) {
            log.error("延时队列执行异常 {}", e);
        }
    }

    /**
     * 初始化数据
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() {
        if(delayUtils == null) {
            delayUtils = new DelayUtils(redisTemplate);
        }
        try {
            Thread thread = new Thread(delayUtils);
            thread.start();
            //启动项目,将redis缓存加载到内存
            for (String delayKey : delayKeys) {
                Map<String, Object> map = redisTemplate.opsForHash().entries(delayKey);
                if(map != null && map.size() > 0) {
                    for (String key : map.keySet()) {
                        DelayedTask delayedTask = (DelayedTask) map.get(key);
                        delayQueue.offer(delayedTask);
                    }
                }
            }
        } catch (Exception e) {
            log.error("初始化队列数据异常 {}", e);
        }
    }
}
public class DelayedTask implements Delayed, Serializable {

    private String taskJson ;
    private long start = System.currentTimeMillis();//开始时间
    private long time ;


    public DelayedTask(String taskJson, long time) {
        super();
        this.taskJson = taskJson;
        this.time = time;
    }

    public String getTaskJson() {
        return taskJson;
    }

    public void setTaskJson(String taskJson) {
        this.taskJson = taskJson;
    }

    @Override
    public String toString() {
        return "OrderDelayedTask [orderJson=" + taskJson + ", start=" + start + ", time=" + time + "]";
    }

    /**
     * 需要实现的接口,获得延迟时间   用过期时间-当前时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        DelayedTask o1 = (DelayedTask) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}



版权声明:本文为weixin_44180753原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。