public List<StaffWorkRecordDO> selectCurrentWorkByStaffIds(List<Long> staffIds) {
if (CollectionUtils.isEmpty(staffIds)) {
return Lists.newArrayList();
}
/*List<StaffWorkRecordDO> result = Lists.newArrayList();
List<List<Long>> partitionStaffIds = Lists.partition(staffIds, WorkConstants.DEFAULT_STAFFID_SIZE);
for (List<Long> listIds : partitionStaffIds) {
List<StaffWorkRecordDO> staffWorkRecordDOS = staffWorkRecordDao.selectCurrentWorkByStaffIds(listIds);
result.addAll(staffWorkRecordDOS);
}
return result;*/
//多线程执行查询员工信息汇总
List<List<Long>> partitionStaffIds = Lists.partition(staffIds, WorkConstants.DEFAULT_STAFFID_SIZE);
CopyOnWriteArrayList<StaffWorkRecordDO> result = new CopyOnWriteArrayList<>();
//多线程查询每个员工集合具体详情信息
CompletableFuture<Void> all = CompletableFuture.allOf(
partitionStaffIds.stream().map(n -> CompletableFuture.runAsync(() -> {
List<StaffWorkRecordDO> staffWorkRecordDOS = staffWorkRecordDao.selectCurrentWorkByStaffIds(n);
result.addAll(staffWorkRecordDOS);
}, ThreadPoolUtil.DATA_STAFF_WORK_POOL).exceptionally(e -> {
log.error("selectCurrentWorkByStaffIds.CompletableFuture.error: {}", e.getMessage());
throw StaffWorkbenchException.of(ErrorCode.SYSTEM_ERROR.getCode(), "批量获取员工工作记录数据失败");
})).toArray(CompletableFuture[]::new));
//阻塞,直到所有任务结束。
all.join();
return result;
}
public class ThreadPoolUtil {
private static final int DATA_SOURCE_POOL_SIZE = 20;
private static final int DATA_SOURCE_POOL_QUEUE_SIZE = 1000;
public static final ThreadPoolExecutor DATA_STAFF_WORK_POOL =
ExecutorsEx.newBlockingThreadPool(DATA_SOURCE_POOL_SIZE, DATA_SOURCE_POOL_QUEUE_SIZE,
"data-staff-work-record-thread-%d");
}
线程池定义
版权声明:本文为qq_39809613原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。