在线QQ客服:1922638
专业的SQL Server、MySQL数据库同步软件
背景
在项目中开发一批新的和更新的需求。由于以前的数据是一个接一个地存储的,因此每天的存储量约为100,000。后来,当需求增加的日存储量增加到一百万时,存储遇到瓶颈,Mq积压严重。后来发现,这需要与Mysql频繁交互,并且需要等待写入库的结果返回。效率令人担忧,并且减慢了其他模块的速度。需要批量添加新产品和批量更新。
优化
第一步:使用线程池进行更新,将更新代码提交给线程池,然后线程池调度存储?
缺点:没有解决与数据库频繁交互的问题。 ?
步骤2:无论更新结果如何,执行模块只需要将更新任务放入队列中并直接返回即可。用Spring的定时任务注释@Scheduled,指定一个方法,并定期调用存储方法;存储的逻辑是,获取队列中当前任务的数量,cnt,循环轮询任务并将其添加到列表中。在轮询具有足够的cnt后,通过批处理更新方法将List更新到数据库。
缺点:定时执行无法控制队列大小。一次可能要执行许多任务,或者队列可能会过大。 ?
第三步:使用阻塞队列放置更新任务,并在守护程序线程轮询的队列中使用这些任务。当任务数等于300时(此值根据实际情况,我们的老板建议300500),然后批量更新一次。将超时时间设置为2秒时,如果仍然超过2秒仍未收到任务,它将一次批量更新通道的任务。
@服务
公共类BatchExecutorDataJob {
?
最终的Logger logger = LoggerFactory.getLogger(BatchExecutorDataJob.class);
?
@Resource
私有JobService jobService;
?
//定义一个容量为10000的阻塞队列,多个生产者可以同时放置BlockingQueue线程安全
私人BlockingQueue \ lt;作业dataQueue =新的LinkedBlockingQueue \ lt; \ gt; (10000);
?
//
私人列表\ lt;作业list = new ArrayList \ lt;作业();
?
//将生产者的任务方法调用
public void recordJob(工作职位){
尝试{
dataQueue.put(作业);
} catch(InterruptedException e){
? LOGGER.info(”将更新作业批量添加到队列中是异常的”);
Thread.currentThread()。打断 ();
}
}
?
//初始化称为
@PostConstruct
私人无效init(){
?线程线程=新线程(()-\ gt; {
? LOGGER.info(”启动批处理更新守护程序线程,开始时间{}”,新日期(System.currentTimeMillis()));
while(Boolean.TRUE){
职位调查=空;
布尔pollTimeOut = false;
启动时间长;
endTime长;
{
{brbr/>
轮询时将超时时间设置为2秒
?轮询? dataQueue.poll(2,TimeUnit.SECONDS);
} catch(InterruptedException e){
? LOGGER.info(”批处理更新作业例外”);
Thread.currentThread()。打断 ();
}
?
if(null!=民意测验){
//轮询以将任务添加到列表中
list.add(投票);
}其他{
//轮询超时,设置超时标志位
pollTimeOut = true;
}
?
如果任务列表等于5000或轮询超时,并且列表中有任务,请批量更新
如果(list.size()== 300 ||?
(pollTimeOut \ \!CollectionUtils.isEmpty(列表))){
startTime = System.currentTimeMillis();
JobService.batchUpdateByPrimaryKeySelective(列表);
? LOGGER.info(”作业批处理更新{}个任务,耗时{} ms”,list.size(),?
System.currentTimeMillis()-startTime);
list.clear();
}
}
});
?
?线? setName(” job-batchUpdate-deamon”);
//将启动的线程设置为守护程序线程
thread.setDaemon(true);
thread.start();
}
}
没有提到生产者方法,主要是消费者方法:
1,@ PostConstruct将在Bean初始化之前执行使用者方法
2.poll = dataQueue.poll(2,TimeUnit.SECONDS),使用轮询方法阻塞队列,设置轮询超时时间,并在时间超过时返回空值
3.list.size()== 300,如果List中的值等于300,则执行批量更新
4.pollTimeOut \ \! CollectionUtils.isEmpty(列表),如果轮询超时,则表示当前生产者没有生产任务或不再生产任务,请批量更新列表中的其余任务?
5,thread.setDaemon(true),将线程设置为守护线程,停止直到jvm停止
?
最后提交代码,检查日志日志,并且使用批处理存储会花费很长时间。在找到各种原因之后,jdbcUrl仍然需要添加配置
rewriteBatchedStatements = true
添加后,重新部署项目,存储时间立即在1000ms以下。
———————?
原文:https://blog.csdn.net/qq_36245532/article/details/88190511?
?