线程池ThreadPoolExecutor详解

  • Post author:
  • Post category:其他

1.ThreadPoolExecutor构造参数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
参数名 作用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 超过corePoolSize数目的空闲线程存活时间
TimeUnit 时间单位
BlockingQueue 阻塞任务队列
ThreadFactory 线程工厂
RejectedExecutionHandler 当提交任务数超过maximumPoolSize+BlockingQueue之和时,任务会交给此handler处理

注意:
1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3. .当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
7. RejectedExecutionHandler,ThreadPoolExecutor类中有四个实现的静态内部类,默认是AbortPolicy

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

2.ThreadPoolExecutor的重要源码

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

使用AtomicInteger保证原子操作,通过巧妙操作(与&、非~),这个变量保存着两个信息:
①有效线程数量
②线程池运行状态runState

public void execute(Runnable command) {

        int c = ctl.get();
        //活动线程数 < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            /**
             * ReentrantLock mainLock = new ReentrantLock();
             * HashSet<Worker> workers = new HashSet<Worker>();
             * 直接启动新的线程,封装成Worker类,存放在workers的Set集合中,并发访问workers时,通过mainLock加锁
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //活动线程数>=corePoolSize && runState为RUNNING && 队列未满
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //double check,非RUNNING状态从workQueue中移除
            if (! isRunning(recheck) && remove(command))
                //采用线程池指定的策略拒绝任务
                reject(command);
            else if (workerCountOf(recheck) == 0)
                //SHUTDOWn状态下,但是队列中仍然有任务未执行,null表示不再接收新任务
                addWorker(null, false);
        }
        /**
         * 两种情况:
         * ① 非RUNNING状态
         * ② workCount > maximumPoolSize
         */
        else if (!addWorker(command, false))
            reject(command);
    }

3.Executors提供的线程池配置方案

3.1 newFixedThreadPool

构造一个固定线程数目的线程池,corePoolSize=maximumPoolSize,使用了一个无界LinkedBlockingQueue存放阻塞任务,永远没有多余任务由RejectedExecutionHandler处理

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.2 newSingleThreadExecutor

构造只支持一个线程的线程池,无界阻塞队列LinkedBlockingQueue,保证任务由一个线程串行执行。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.3 newCachedThreadPool

构造一个具有缓冲功能的线程池,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及无容量的阻塞队列SynchronousQueue,因此任务提交后会马上交给线程执行。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

3.4 newScheduledThreadPool

构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

版权声明:本文为lecoder原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。