1.ThreadPoolExecutor构造参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数名 | 作用 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 最大线程池大小 |
keepAliveTime | 超过corePoolSize数目的空闲线程存活时间 |
TimeUnit | 时间单位 |
BlockingQueue | 阻塞任务队列 |
ThreadFactory | 线程工厂 |
RejectedExecutionHandler | 当提交任务数超过maximumPoolSize+BlockingQueue之和时,任务会交给此handler处理 |
注意:
1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3. .当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
7. RejectedExecutionHandler,ThreadPoolExecutor类中有四个实现的静态内部类,默认是AbortPolicy
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
2.ThreadPoolExecutor的重要源码
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
使用AtomicInteger保证原子操作,通过巧妙操作(与&、非~),这个变量保存着两个信息:
①有效线程数量
②线程池运行状态runState
public void execute(Runnable command) {
int c = ctl.get();
//活动线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
/**
* ReentrantLock mainLock = new ReentrantLock();
* HashSet<Worker> workers = new HashSet<Worker>();
* 直接启动新的线程,封装成Worker类,存放在workers的Set集合中,并发访问workers时,通过mainLock加锁
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
//活动线程数>=corePoolSize && runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//double check,非RUNNING状态从workQueue中移除
if (! isRunning(recheck) && remove(command))
//采用线程池指定的策略拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
//SHUTDOWn状态下,但是队列中仍然有任务未执行,null表示不再接收新任务
addWorker(null, false);
}
/**
* 两种情况:
* ① 非RUNNING状态
* ② workCount > maximumPoolSize
*/
else if (!addWorker(command, false))
reject(command);
}
3.Executors提供的线程池配置方案
3.1 newFixedThreadPool
构造一个固定线程数目的线程池,corePoolSize=maximumPoolSize,使用了一个无界LinkedBlockingQueue存放阻塞任务,永远没有多余任务由RejectedExecutionHandler处理
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3.2 newSingleThreadExecutor
构造只支持一个线程的线程池,无界阻塞队列LinkedBlockingQueue,保证任务由一个线程串行执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3.3 newCachedThreadPool
构造一个具有缓冲功能的线程池,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及无容量的阻塞队列SynchronousQueue,因此任务提交后会马上交给线程执行。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3.4 newScheduledThreadPool
构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}