ThreadPool + CountDownLatch + Semaphore的使用,处理批量任务

  • Post author:
  • Post category:其他


直接上干货, 就是处理批量任务的

@Slf4j
public class ThreadPoolAndCountDownLatchAndSemaphore {
    //待处理批量数据
    public static List<String> batchList = new ArrayList<>(20);
    //最大处理数量
    private static int EXPIRED_PAGE_SIZE = 5;
    //最大线程
    private static int MAX_THREADS = 10;

    public static void main(String[] args) {
        batchList.addAll(Arrays.asList("1","2","3","1","2","3","1","2","3","1","2","3","1","2","3","1","2","3","1","2"));
        System.out.println("总任务数:" + batchList.size());
        System.out.println(countDownLatchAndSemaphore());
    }



    public static String countDownLatchAndSemaphore(){
        int runSize;
        List<String> handleList;

        //1. 根据任务计算线程数
        int listSize = batchList.size();
        if (listSize % EXPIRED_PAGE_SIZE == 0) {
            runSize = listSize / EXPIRED_PAGE_SIZE;
        } else {
            runSize = (listSize / EXPIRED_PAGE_SIZE) + 1;
        }

        ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize);
        CountDownLatch countDownLatch = new CountDownLatch(runSize);
        Semaphore semaphore = new Semaphore(MAX_THREADS);

        //2.均匀分布任务
        for (int i = 0; i < runSize; i++) {
            if (i + 1 == runSize) {
                handleList = batchList.subList(i * EXPIRED_PAGE_SIZE, listSize);
            } else {
                handleList = batchList.subList(i * EXPIRED_PAGE_SIZE, (i + 1) * EXPIRED_PAGE_SIZE);
            }
            //3.异步执行
            executor.execute(new subTask(handleList, countDownLatch, semaphore, (ServletRequestAttributes) RequestContextHolder.getRequestAttributes()));
        }
        try {
            //4. 全部线程执行完毕 放行
            countDownLatch.await();
            return "所线程任务执行完毕!";
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "执行异常!";
        } finally {
            //5. 关闭线程池
            executor.shutdown();
        }
    }
}
@AllArgsConstructor
public class subTask implements Runnable{
    List<String> handleList;
    CountDownLatch countDownLatch;
    Semaphore semaphore;
    ServletRequestAttributes requestAttributes;


    @Override
    public void run() {
        if (!CollectionUtils.isEmpty(this.handleList)) {
            try {
                //3.1 抢占线程资源执行
                semaphore.acquire();
                //处理多线程中request头问题
                RequestContextHolder.setRequestAttributes(requestAttributes);
                System.out.println(Thread.currentThread().getName() + "\t 任务执行完毕!处理任务数:" + handleList.size());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //3.2 释放线程资源
                semaphore.release();
                //3.3 线程执行完毕 -1
                countDownLatch.countDown();
            }
        }

    }
}



版权声明:本文为a546835886原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。