李大爷的线程池堪称经典和艺术,先顶礼膜拜!小生技术菜鸟一枚,纯属自我学习记录,有写的不好或者不对的地方,请各位大佬喷完后记得告诉我错在哪??,废话不BB了,直奔主题吧。
首先想要了解ThreadPoolExecute必须要清楚线程池的状态(ps:英文好的小伙伴可以忽略我的翻译,直接看英文注释):
* RUNNING: 可以接受新任务,并且可以执行阻塞队列中的任务
*
* SHUTDOWN: 不可以接受新任务,但是可以执行阻塞队列中的任务
*
* STOP: 不可以接受新任务,不执行阻塞队列中的任务,并且还能打断正在执行的任务
*
* TIDYING: 所有的任务都被终止,线程池中的任务数是0的时候线程的状态将改变成TIDYING,
* 并且经会执行terminated()
*
* TERMINATED: terminated()方法已经完成
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
了解清楚了线程的状态,那就开始ThreadPoolThread源码之旅:
1>看了上面的源码应该知道了李大爷对线程状态的标识。方便清楚下面代码中各种对线程池状态的判断和改变。
//AtomicInterger类型的ctl状态变量(atomic类型保证内存可见性的同时,
// 支持CAS无锁原子性操作,在源码中到处充斥了该操作,提高并发性)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//后(2^29)-1 (大约 500 百万)为线程数限制,作者Doug lea表示未来扩
// 展可通过long类型来突破线程数大小限制
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高三位为线程池状态位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 返回线程池的当前状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//返回线程池的线程数。
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过上述两个值(运行状态和任务数)生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//当前线程池的状态,是不是小于给定的状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//当前线程池的状态,是不是大于等于给定的状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//当前线程池的状态,是不是RUNNING
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 使用CAS原理对当前线程池线程数量值加一
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 使用CAS原理对当前线程池线程数量值减一
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 使用CAS原理对当前线程池线程数量值减一,直到成功为止
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2>小伙伴们打起精神来了,接下来将一起了解一下面试官最喜欢问到的一道面试题“请简单说下你知道的线程池和ThreadPoolThread有哪些构造参数”。
常用的JDK推荐的线程池:
Executors.newCachedThreadPool (无界线程池,可自动回收,缺点:涌入大量任务时会大量创建线程)
//为什么会自动回收,看完文章你就清楚了
Executors.newFixedThreadPool (线程池中的线程数固定大小,超出的线程会在队列中等待,缺点:任务数量过大效率不高)
Executors.newScheduledThreadPool(创建一个定长线程池,支持定时及周期性任务执行)
Executors.newSingleThreadExecutor (单个后台线程)
深入源码,你会发现:这几个方法都调用了ThreadPoolExecutor的构造函数,只要研究ThreadPoolExecutor构造函数就行。ThreadPoolExecutor中入参最多的构造函数的源码:
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程数大于核心线程数,并且阻塞队列为空时,
线程的等待时间
TimeUnit unit,//时间戳
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) {//拒绝策略
//对参数校验。核心线程数>=0,最大线程数>0且大于核心线程数,等待时间>=0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心可运行线程数
maximumPoolSize:最大可运行运行程数
workQueue:阻塞队列
keepAliveTime:当线程大于核心线程数时,且阻塞队列没有元素,最大等待时间
threadFactory:生成线程的工厂类
handler:超出线程池最大承受能力之后的失败策略方法对象
3> 步骤a. 当线程数小于corePoolSize,会正常执行任务。
步骤b. 当任务数量超过核心线程数的时候,会将任务放入阻塞队列中
步骤c. 当阻塞队列放满之后,线程池会创建新线程,当然不能超过maximumPoolSize。
具体实现看下execute()方法,而且submit()方法其实也是调用
execute()
.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//读取ctl
//当前任务数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//步骤a
//这里第二参数写的很巧妙,在addWorker()方法会特别突出
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池状态是running/shutdown,并且成功将任务放入阻塞队列中
//其实可以理解成线程池状态是running/shutdown并且任务数大于核心线程数
if (isRunning(c) && workQueue.offer(command)) {//步骤b
int recheck = ctl.get();//再次读取ctl,防止并发
//如果线程池的状态已经发生了变化,需要把刚放入阻塞队列中的任务移除,
// 并且使用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//到了这里表示:线程池里面已经没有可执行任务的线程,
//但是刚又给阻塞队列中加了个任务,还不符合使用拒绝策略的条件,
//追加一个new Thread执行任务,初始化任务为null,因为要调用getTask()方法从阻塞队列中获取task
addWorker(null, false);
}
//步骤c了,
else if (!addWorker(command, false))
reject(command);
}
看完了execute()方法后,大家会发现该方法中有一个非常重要的方法addWorker(),那么久让我们详细看下addWorker()方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {//李大爷特别喜欢这种方式的死循环,大家要习惯
int c = ctl.get();
int rs = runStateOf(c);//获取线程池的状态
// 当前线程池的状态是STOP/TIDYING/TEMINATE,
//并且状态是SHUTDOWN,firstTask==null,阻塞队列是空只要任意一个是false的条件下返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//总之,能历经上面if走到这里来只有两种情况:
//1.线程池的状态是running
//2.线程池的状态是shutdown的时候,queue不为空
for (;;) {
int wc = workerCountOf(c);//当前线程池线程数。
//CAPACITY是目前jdk所允许的线程数上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//这里就是execute方法中的传入的参数,用来控制当前线程数是核心线程数还是最大线程数
return false;
if (compareAndIncrementWorkerCount(c))//当前线程池线程数量值加1
break retry;//跳出死循环
//并发导致当前线程数量+1没有成功的话
c = ctl.get(); //重新取ctl值
if (runStateOf(c) != rs)//当前线程池的状态发生了变化,就要回到该方法起点了。需要重新判断线程池的状态能不能执行task
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//上面的for死循环中经历重重,终于走到这了,那么线程数+1了
boolean workerStarted = false;//是否启动worker
boolean workerAdded = false;//是否将task封装到worker
Worker w = null;
try {
w = new Worker(firstTask);//work就是个封装类,后面会介绍的
final Thread t = w.thread;
if (t != null) {//判断threadFactory.newThread是否为null。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获得锁后,再次检查ctl state.防止获得mainLock之前,pool关闭,
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN:线程池只能是RUNNING状态
// rs == SHUTDOWN && firstTask == null:
// 线程池是SHUNTDOWN且firstTask为空,这种情况主要是因为阻塞队列还有没运行完的线程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //任务刚封装到work里面,还没start,你封装的线程就是alive,几个意思?肯定是要抛异常出去的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//largestPoolSize只是表明线程池曾经出现过的最大的线程数
if (s > largestPoolSize)
largestPoolSize = s;//更新线程池出现过的最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//封装task到worker中失败的操作
addWorkerFailed(w);
}
return workerStarted;
}
看到了addWorker()方法,很神奇的是居然不是执行任务,而是把firstTask封装到worker中, 自然而然的要观摩下worker的结构了:
private final class Worker
extends AbstractQueuedSynchronizer//对AQS暂时还没了解,所以AQS相关的不做解释
implements Runnable//看到这是不是神奇的发现worker他娘就是个Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
//注意了,这才是真正执行task的线程,从构造函数可知是由ThreadFactury创建的
final Thread thread;
//这就是需要执行的task
Runnable firstTask;
//完成的任务数,用于线程池统计
volatile long completedTasks;
//构造函数
Worker(Runnable firstTask) {
setState(-1); //初始状态 -1,防止在调用runWorker(),也就是真正执行task前中断thread。
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
看完了worker,大家会发现一个很神奇的现象,原来真正执行任务的是worker中封装的thread,而这个thread是由ThreadPoolExecute()方法中的参数创建的。到目前为止,ThreadPoolExecute()方法中的参数,我们已经在源码中见到了corePoolSize,maximumPoolSize,rejectHandler,ThreadFactory.就差一个keepAliveTime了,别着急,这个参数容小生在后面的源码中慢慢道来。
既然是在running状态或者shutdown状态并且queue不能空才会将firstTask封装到worker, 如果封装线程启动失败,会进行一系列的失败处理addWorkerFailed(w):
//封装worker失败
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);//既然失败了,先把没有task的worker从workers中移除
decrementWorkerCount();//addWorker()方法线程数+1,到这了就要减1
tryTerminate();//尝试结束线程池
} finally {
mainLock.unlock();
}
}
tryTerminate();//尝试结束线程池 这个方法在源码中多处用到,一并看一下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//如果线程池的状态是RUNNING或者是TERMINATED或者是SHUTDOWN且阻塞队列不为空的情况不需要终止线程池
//RUNNING状态和SHUTDOWN且阻塞队列不为空,肯定不能终止线程池
//TERMINATED状态就不需要再终止一次了
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//接下来就是STOP状态或者SHUTDOWN状态并且阻塞队列是空
//STOP状态可以终止正在执行的线程,SHUTDOWN状态并且阻塞队列是空的时候没有任务需要执行
// 既然可以终止任务或者没有任务执行了,就把当前线程interrupt,释放资源留待有需要的地方使用
if (workerCountOf(c) != 0) { // Eligible to terminate 有权限终止
//中断workers中第一个worker对象中的thread
interruptIdleWorkers(ONLY_ONE);//ONLY_ONE==true
return;
}
//到这里,线程池的状态要不是就stop且已经终止正在执行的任务,要不就是shutdown+等待队列为空
//线程池已经没有用途了,所以就尝试终止线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//将ctl的值设置TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用terminated()终止线程池(没找具体实现方法,很纳闷)
terminated();
} finally {
//最终把线程池状态修改为treminated
ctl.set(ctlOf(TERMINATED, 0));
//唤醒条件等待队列中挂起的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//线程没有被打断且获取到封装线程的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
看到这里已经看到了ThreadPoolExecute源码中最主要的方法之一addWorker(),也了解了worker到底是个啥东东了,接下来我们会看ThreadPoolExecute源码中另外一个关键方法内部类Worker中的runWorker()方法:
public void run() {
runWorker(this);//注意看参数是this,而这个方法又是在Worker类中
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//这里做置空不理解,有大神知道的还希望指出来
w.firstTask = null;
w.unlock(); // allow interrupts允许中断
boolean completedAbruptly = true;
try {
//正常情况下第一次循环的时候task!=null,因为||短路机制会直接执行while里面的逻辑,
//在finally()中会将task=null,或者如execute()方法中的add(null,false)也会将task置空,
//如果task==null会执行getTask()方法从阻塞队列中获取task。
while (task != null || (task = getTask()) != null) {
w.lock();
//如果当前线程也就是worker中的thread不是中断状态并且线程池的状态是stop,
// tidying,terminated状态,需要中断线程
//其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
// 主要用于清除线程终端标志,因为很大可能线程池刚刚转换成STOP
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//用户实现
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//用户实现
}
} finally {
task = null;
w.completedTasks++;//完成任务数+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
接着我们来看核心的getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池的状态是>=stop或者rs==shutdown且阻塞队列为空
//总之就是没有不能执行任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//线程数量-1
return null;
}
//下面就是可以执行新任务或者执行队列中的任务
int wc = workerCountOf(c);
// allowCoreThreadTimeOut可以用户设置,
// allowCoreThreadTimeOut==true时表示核心线程数可以根据keepAliveTime时间收缩
//ThreadPoolExecute最后一个参数终于千呼万唤使出来,没错keepAliveTime
//不仅用于maximumPoolSize的收缩,还可以用于corePoolSize的收缩
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//maximumPoolSize必须大于corePoolSize,maximumPoolSize必须大于corePoolSize最小是0,
// maximumPoolSize最小必须是1
//线程数 > maximumPoolSize,则这个if一定为true,而我们又知道线程池的
// 线程数不能大于maximumPoolSize,所以会死循环执行compareAndDecrementWorkerCount()
//除非减少线程数,return null。
//当wc > maximumPoolSize不满足的时候,会判断(timed && timedOut),因为还没执行到下面的代码,
//timedOut默认为false,所以整个if为false
//经过一次循环后,且没有从阻塞队列中获取到task的时候timedOut==true,workQueue.isEmpty()也是true
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//当allowCoreThreadTimeOut==true,也就开启核心线程的收缩
//或者是阻塞队列为空,且线程数大于核心线程数,多余的线程数需要收缩
//此时timed == true,阻塞队列中获取task就会阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果阻塞队列为空,r==null,将timedOut改为true,继续循环
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
好了,到这里ThreadPoolExecute的源码分析的 差不多,也是凌晨了,这是技术菜鸟我的第一篇技术博客,写的不好的地方大家见谅,也可以喷,就是麻烦喷完以后给正确的解释。