前言
在某些场景下,操作比较耗时,给用户体验不是很好,这时候我们就会直接想到两种方案,一种是定时任务,一种就是异步队列,那些实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。
一、异步队列实现思路?
持久化=>插入队列=>出队,当程序突然停止,当程序启动的时候,从库里面拉出未执行的数据继续入队(补偿机制),下面是java的简单实现。
二、实现步骤
1.加入监听器
代码如下(示例):在web.xml 加入监听器
<!--转交异步处理监听器 -->
<listener>
<listener-class>context.ZjListener</listener-class>
</listener>
2.实现监听器
代码如下(示例):
public class ZjListener implements ServletContextListener {
private static Logger log = LoggerFactory.getLogger(ZjListener.class);
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
log.info("初始化转交异步处理线程...");
ZjManager.getInstance().starup();
log.info("初始化转交异步处理线程成功...");
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
ZjManager.getInstance().shutdown();
}
3.实现转交处理对象和转交处理线程
代码如下(示例):我就简单示例一些,具体实现看业务昂
public class ZjRequset {
private String dm;
private String xh;
public String getDm() {
return dm;
}
public void setDm(String dm) {
this.dm = dm;
}
public String getXh() {
return xh;
}
public void setXh(String xh) {
this.xh = xh;
}
}
public class ZjThread implements Runnable{
private ZjRequset requset;
private ZjService zjService = WebAppContext.getBeanEx("ZjService");
/**
* @description 实例化一个自动转交处理线程
* @param requset
*/
public ZjThread(ZjRequset requset){
this.requset = requset;
}
/**
* @description 获取请求
* @return ZjRequset
*/
public ZjRequset getRequest() {
return requset;
}
@Override
public void run() {
if (requset != null){
zjService.saveZj(requset.getDm(), requset.getXh());
}
}
}
4.自动转交异步处理
代码如下(示例):这一块就是核心的代码了
public class ZjManager {
private static final Log LOG = LogFactory.getLog(ZjManager.class);
private static final ZjManager INSTANCE = new ZjManager();
/**
* 线程池维护线程的最少数量
*/
private final static int CORE_POOL_SIZE = 2;
/**
* 线程池维护线程的最大数量
*/
private final static int MAX_POOL_SIZE = 3;
/**
* 线程池维护线程所允许的空闲时间
*/
private final static int KEEP_ALIVE_TIME = 0;
/**
* 线程池所使用的缓冲队列大小
*/
private final static int WORK_QUEUE_SIZE = 200;
/**
* 是不是第一次启动程序
*/
private static boolean FIRST_QD = true;
/**
* 自动转交异步处理队列
*/
private final Queue<ZjRequset> requestQueue = new LinkedList<ZjRequset>();
/**
* 线程池
*/
private ThreadPoolExecutor threadPool = null;
/**
* 调度器
*/
private ScheduledExecutorService scheduler = null;
/**
* @description 获取异步处理管理器实例
*/
public static ZjManager getInstance(){
return INSTANCE;
}
/**
* @description 队列是否为空
*/
private boolean hasAcquire() {
return !requestQueue.isEmpty();
}
/**
* @description 启动工作线程
*/
public boolean starup(){
LOG.info( Console.getNowStr() + " 正在启动异步处理管理器...");
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(accessBufferThread, 0, 3, TimeUnit.SECONDS);
LOG.info(Console.getNowStr() + " 启动异步处理管理器成功!");
return true;
}
/**
* @description 关闭工作线程
*/
public void shutdown(){
if (scheduler != null) {
scheduler.shutdown();
}
if (threadPool != null) {
threadPool.shutdown();
}
}
/**
* @description 处理器
*/
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
synchronized (requestQueue) {
try {
requestQueue.offer(((ZjThread) r).getRequest());
} catch (Exception e) {
LOG.error("插入自动转交队列失败",e);
}
}
}
};
/**
* @description 访问消息缓存的调度线程,查看是否有待定请求,如果有,则创建一个新的,并添加到线程池
*/
final Runnable accessBufferThread = new Runnable() {
@Override
public void run() {
synchronized (requestQueue) {
try {
if (FIRST_QD){
reloadRequest();
}
if (hasAcquire()) {
ZjRequset request = requestQueue.poll();
if (request != null) {
Runnable task = new ZjThread(request);
threadPool.execute(task);
}
}
} catch (Exception e) {
LOG.error("重新执行失败",e);
}
}
}
};
/**
* 增加一个数据库操作
*
* @param request the request
*/
public void AddRequest(ZjRequset request) {
try {
if (request != null) {
//持久化,写入库中
wirteRequest(request);
Runnable task = new ZjThread(request);
threadPool.execute(task);
}
} catch (Exception e) {
LOG.error(e.getMessage(),e);
}
}
/**
* 写入到表中
*
* @param request the request
*/
void wirteRequest(ZjRequset request) {
//将要请求写入库
}
/**
* 将库中未执行的任务添加到队列中
*/
void reloadRequest() {
FIRST_QD = false;
Connection conn = null;
PreparedStatement pst = null;
ResultSet rs = null;
//举个例子
try {
conn = getConn();
pst = conn.prepareStatement(sql);
rs = pst.executeQuery();
while (rs.next()) {
ZjRequset request = new ZjRequset();
request.setDm(rs.getString("DM"));
request.setXh(rs.getString("XH"));
ZjManager.getInstance().AddRequest(request,"");
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}finally {
DBUtils.closeResultSet(rs);
DBUtils.closePStatement(pst);
DBUtils.closeConnection(conn);
}
}
/**
* 获取队列待处理线程数量
*/
public int getQueueCount(){
return requestQueue.size();
}
/**
* 获取处理线程的状态
*/
public int getThreadZt(){
return threadPool.getActiveCount();
}
}
5.业务如何添加
ZjRequset clRequest = new ZjRequset();
clRequest.setDm(dm);
clRequest.setXh(xh);
ZjManager.getInstance().AddRequest(clRequest);
版权声明:本文为Uluoyu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。