java异步队列

  • Post author:
  • Post category:java





前言

在某些场景下,操作比较耗时,给用户体验不是很好,这时候我们就会直接想到两种方案,一种是定时任务,一种就是异步队列,那些实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。




一、异步队列实现思路?

持久化=>插入队列=>出队,当程序突然停止,当程序启动的时候,从库里面拉出未执行的数据继续入队(补偿机制),下面是java的简单实现。



二、实现步骤



1.加入监听器

代码如下(示例):在web.xml 加入监听器

<!--转交异步处理监听器 -->
    <listener>
        <listener-class>context.ZjListener</listener-class>
    </listener>



2.实现监听器

代码如下(示例):

public class ZjListener implements ServletContextListener {

    private static Logger log = LoggerFactory.getLogger(ZjListener.class);

    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        log.info("初始化转交异步处理线程...");
        ZjManager.getInstance().starup();
        log.info("初始化转交异步处理线程成功...");
    }

    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        ZjManager.getInstance().shutdown();
    }



3.实现转交处理对象和转交处理线程

代码如下(示例):我就简单示例一些,具体实现看业务昂

public class ZjRequset {

    private String dm;

    private String xh;

    public String getDm() {
        return dm;
    }

    public void setDm(String dm) {
        this.dm = dm;
    }
    
    public String getXh() {
        return xh;
    }

    public void setXh(String xh) {
        this.xh = xh;
    }
}
public class ZjThread implements Runnable{

    private ZjRequset requset;

    private ZjService zjService = WebAppContext.getBeanEx("ZjService");



    /**
     * @description 实例化一个自动转交处理线程
     * @param requset
     */

    public ZjThread(ZjRequset requset){
        this.requset = requset;
    }

    /**
     * @description 获取请求
     * @return ZjRequset
     */
    public ZjRequset getRequest() {
        return requset;
    }

    @Override
    public void run() {
        if (requset != null){
            zjService.saveZj(requset.getDm(), requset.getXh());
        }
    }
}




4.自动转交异步处理

代码如下(示例):这一块就是核心的代码了

public class ZjManager {

    private static final Log LOG = LogFactory.getLog(ZjManager.class);

    private static final ZjManager INSTANCE = new ZjManager();


    /**
     * 线程池维护线程的最少数量
     */
    private final static int CORE_POOL_SIZE = 2;

    /**
     * 线程池维护线程的最大数量
     */
    private final static int MAX_POOL_SIZE = 3;

    /**
     * 线程池维护线程所允许的空闲时间
     */
    private final static int KEEP_ALIVE_TIME = 0;

    /**
     * 线程池所使用的缓冲队列大小
     */
    private final static int WORK_QUEUE_SIZE = 200;


    /**
     * 是不是第一次启动程序
     */
    private static boolean FIRST_QD = true;

    /**
     * 自动转交异步处理队列
     */
    private final Queue<ZjRequset> requestQueue = new LinkedList<ZjRequset>();


    /**
     * 线程池
     */
    private ThreadPoolExecutor threadPool = null;

    /**
     * 调度器
     */
    private ScheduledExecutorService scheduler = null;

    /**
     * @description 获取异步处理管理器实例
     */
    public static ZjManager getInstance(){
        return INSTANCE;
    }

    /**
     * @description 队列是否为空
     */
    private boolean hasAcquire() {
        return !requestQueue.isEmpty();
    }

    /**
     * @description 启动工作线程
     */
    public boolean starup(){
        LOG.info( Console.getNowStr() + " 正在启动异步处理管理器...");
        threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(accessBufferThread, 0, 3, TimeUnit.SECONDS);
        LOG.info(Console.getNowStr() + " 启动异步处理管理器成功!");
        return true;
    }

    /**
     * @description 关闭工作线程
     */
    public void shutdown(){
        if (scheduler != null) {
            scheduler.shutdown();
        }
        if (threadPool != null) {
            threadPool.shutdown();
        }
    }

    /**
     * @description 处理器
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            synchronized (requestQueue) {
                try {
                    requestQueue.offer(((ZjThread) r).getRequest());
                } catch (Exception e) {
                    LOG.error("插入自动转交队列失败",e);
                }
            }
        }
    };

    /**
     * @description 访问消息缓存的调度线程,查看是否有待定请求,如果有,则创建一个新的,并添加到线程池
     */
    final Runnable accessBufferThread = new Runnable() {
        @Override
        public void run() {
            synchronized (requestQueue) {
                try {
                    if (FIRST_QD){
                        reloadRequest();
                    }
                    if (hasAcquire()) {
                        ZjRequset request = requestQueue.poll();
                        if (request != null) {
                            Runnable task = new ZjThread(request);
                            threadPool.execute(task);
                        }
                    }
                } catch (Exception e) {
                    LOG.error("重新执行失败",e);
                }
            }
        }
    };


    /**
     * 增加一个数据库操作
     *
     * @param request the request
     */
    public void AddRequest(ZjRequset request) {
        try {
            if (request != null) {
            	//持久化,写入库中
                wirteRequest(request);
                Runnable task = new ZjThread(request);
                threadPool.execute(task);
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(),e);
        }
    }

    /**
     * 写入到表中
     *
     * @param request the request
     */
    void wirteRequest(ZjRequset request) {
        //将要请求写入库
    }

    /**
     * 将库中未执行的任务添加到队列中
     */
    void reloadRequest() {
        FIRST_QD = false;
        Connection conn = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        //举个例子
        try {
        		 conn = getConn();
                 pst = conn.prepareStatement(sql);
                 rs = pst.executeQuery();
                 while (rs.next()) {
                     	ZjRequset request = new ZjRequset();
                        request.setDm(rs.getString("DM"));
                        request.setXh(rs.getString("XH"));
                        ZjManager.getInstance().AddRequest(request,"");
                    }
                } catch (SQLException e) {
                    LOG.error(e.getMessage(), e);
                }finally {
                    DBUtils.closeResultSet(rs);
                    DBUtils.closePStatement(pst);
                    DBUtils.closeConnection(conn);
                }
    }

    /**
     * 获取队列待处理线程数量
     */
    public int getQueueCount(){
        return requestQueue.size();
    }

    /**
     * 获取处理线程的状态
     */
    public int getThreadZt(){
        return threadPool.getActiveCount();
    }

}



5.业务如何添加

			ZjRequset clRequest = new ZjRequset();
            clRequest.setDm(dm);
            clRequest.setXh(xh);
            ZjManager.getInstance().AddRequest(clRequest);



版权声明:本文为Uluoyu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。