直接上干货, 就是处理批量任务的
@Slf4j public class ThreadPoolAndCountDownLatchAndSemaphore { //待处理批量数据 public static List<String> batchList = new ArrayList<>(20); //最大处理数量 private static int EXPIRED_PAGE_SIZE = 5; //最大线程 private static int MAX_THREADS = 10; public static void main(String[] args) { batchList.addAll(Arrays.asList("1","2","3","1","2","3","1","2","3","1","2","3","1","2","3","1","2","3","1","2")); System.out.println("总任务数:" + batchList.size()); System.out.println(countDownLatchAndSemaphore()); } public static String countDownLatchAndSemaphore(){ int runSize; List<String> handleList; //1. 根据任务计算线程数 int listSize = batchList.size(); if (listSize % EXPIRED_PAGE_SIZE == 0) { runSize = listSize / EXPIRED_PAGE_SIZE; } else { runSize = (listSize / EXPIRED_PAGE_SIZE) + 1; } ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize); CountDownLatch countDownLatch = new CountDownLatch(runSize); Semaphore semaphore = new Semaphore(MAX_THREADS); //2.均匀分布任务 for (int i = 0; i < runSize; i++) { if (i + 1 == runSize) { handleList = batchList.subList(i * EXPIRED_PAGE_SIZE, listSize); } else { handleList = batchList.subList(i * EXPIRED_PAGE_SIZE, (i + 1) * EXPIRED_PAGE_SIZE); } //3.异步执行 executor.execute(new subTask(handleList, countDownLatch, semaphore, (ServletRequestAttributes) RequestContextHolder.getRequestAttributes())); } try { //4. 全部线程执行完毕 放行 countDownLatch.await(); return "所线程任务执行完毕!"; } catch (InterruptedException e) { e.printStackTrace(); return "执行异常!"; } finally { //5. 关闭线程池 executor.shutdown(); } } }
@AllArgsConstructor public class subTask implements Runnable{ List<String> handleList; CountDownLatch countDownLatch; Semaphore semaphore; ServletRequestAttributes requestAttributes; @Override public void run() { if (!CollectionUtils.isEmpty(this.handleList)) { try { //3.1 抢占线程资源执行 semaphore.acquire(); //处理多线程中request头问题 RequestContextHolder.setRequestAttributes(requestAttributes); System.out.println(Thread.currentThread().getName() + "\t 任务执行完毕!处理任务数:" + handleList.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { //3.2 释放线程资源 semaphore.release(); //3.3 线程执行完毕 -1 countDownLatch.countDown(); } } } }
版权声明:本文为a546835886原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。