一、 ThreadPoolExecutor线程池的状态
RUNNING: //运行状态 Accept new tasks and process queued tasks SHUTDOWN: //不接收新任务,但仍然可以运行任务队列中的任务 Don't accept new tasks, but process queued tasks STOP: //不接收新任务,也不运行任务队列中的任务,中断运行中的工作线程(worker.state>=0) Don't accept new tasks, don't process queued tasks and interrupt in-progress tasks TIDYING: //所有任务都中止,工作线程数为0时,会将状态设置为TIDYING 详见 tryTerminate 方法 All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: // 调用terminated()方法后状态变为终止 terminated() has completed
二、如何优雅的关闭线程池
1、代码实例 使用shutdown 和 shutdownNow
package com.springboot.girl.juc.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* todo 如果我们把awaitTime和shutdownNow方法全部屏蔽掉的只留下shutdown方法的话会怎样呢?
* 会变成表示main方法结束的「end」显示出来之后,会打印出很多的task2的start和end。
* 这就是虽然课程结束了,但是学校仍然不能放学的不正常状态。最恶劣的情况会导致JAVA进程一直残留在OS中。
* 所以我们一定不要忘记使用awaitTermination和shutdownNow
* 1、shutdown 关闭提交通道,已提交的任务还是正常执行
* 2、shutdownNow 关闭提交通道,已提交的任务不再执行
* 3、awaitTermination 当前线程阻塞
* 3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
3.3 或者 线程被中断,抛出InterruptedException
*/
public class TestPoolStatus {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
final long waitTime = 3 * 1000; //任务1的睡眠waitTime时间比shutdown的等待短,则任务1 可以正常执行完
final long awaitTime = 5 * 1000;
Runnable task1 = new Runnable(){
public void run(){
try {
System.out.println("=====================task1 start=====================");
Thread.sleep(waitTime);
System.out.println("=====================task1 end==============");
} catch (InterruptedException e) {
System.out.println("task1 interrupted: " + e);
}
}
};
Runnable task2 = new Runnable(){
public void run(){
try {
System.out.println(" task2 start");
Thread.sleep(1000);
System.out.println(" task2 end");
} catch (InterruptedException e) {
System.out.println("task2 interrupted: " + e);
}
}
};
// 让学生解答某个很难的问题
pool.execute(task1);
// 生学生解答很多问题
for(int i=0; i<100; ++i){
pool.execute(task2);
}
try {
// 向学生传达“问题解答完毕后请举手示意!”
pool.shutdown();
// 向学生传达“XX分之内解答不完的问题全部带回去作为课后作业!”后老师等待学生答题
// (所有的任务都结束的时候,返回TRUE)
if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
pool.shutdownNow();
}
} catch (InterruptedException e) {
// awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
System.out.println("awaitTermination interrupted: " + e);
pool.shutdownNow();
}
System.out.println("end");
}
}
三、shutdown和shutdownNow的用法和区别
1、shutdown 关闭提交通道,已提交的任务还是正常执行
只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停
2、shutdownNow 关闭提交通道,已提交的任务不再执行
能立即停止线程池,正在跑的和正在等待的任务都停下了
3、awaitTermination 当前线程阻塞
3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完 3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间) 3.3 或者 线程被中断,抛出InterruptedException
四、submit和execute的区别?
1、可以接受的任务类型不同
execute只能接受Runnable类型的任务 submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null
2、submit()有返回值,而execute()没有
例如,有个task,希望该task执行完后告诉我它的执行结果,是成功还是失败,然后继续下面的操作,这时需要用submit
3、submit()可以进行Exception处理
例如,如果task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过对Future.get()进行抛出异常的捕获,然后对其进行处理。
4、来源不同
submit 来源ExecutorService接口
execute来源Executor接口
五、execute源码解析
# 作用:
1、初始化核心线程,同时将任务交给初始化的核心线程;
2、核心线程数满了之后,将任务缓存到workQueue中(判断是否能接收任务,与线程池的状态有关)
3、任务队列饱和的情况下,新增临时工作线程,将任务丢给临时线程处理
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 第一步 当线程数量小于核心线程数时,初始化创建核心worker,并将任务交给核心worker处理
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二步 线程池状态是运行中,任务队列未满时,将任务添加到队列 中
//检查线程池状态,如果不是运行中,则删除队列中的任务,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三步 核心线程数已满且任务队列已经饱和的情况下,新增临时线程数,将任务丢给临时worker
else if (!addWorker(command, false))
reject(command);
}
六、addWorker源码解析
#作用 (线程池状态判断)
1、修改工作线程数量
2、创建并启动worker
1、先改线程数量(修改ctl的数量 )
1.1 、SHUTDOWN状态不接收新任务,只完成线程池中的已有任务
注:shutdownNow状态(STOP)不接收新任务,也不处理线程池中的任务 返回false
if (rs >= SHUTDOWN && ! ( //线程池关闭状态 rs == SHUTDOWN && //初始任务为空 firstTask == null && //队列不为空的情况 //以上条件可以解读为: //只要关闭状态(true),来了个新任务(false) ,则不能创建worker //只要是关闭状态(true),新任务为空(true),但是队列不为空吗(true),则可以增加worker ! workQueue.isEmpty() ) ) return false;
1.2 自旋来修改数量
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
1.3 、补充知识:语句块在自旋中的使用 retry | break retry | continue retry
在for(;;)语句中如果 不使用retry定义语句块,只用break 只能跳出内层循环 | continue 只能跳出内层本次循环,程序 会一直执行下去不会停止,不信您可以把retry删除试试
int num = 0;
boolean flag = false;
retry:
for(;;) {
System.out.println("=====local begin:"+num);
for(;;) {
num ++;
if(num == 110) {
break retry; //跳出到retry定义处的外层循环
}
if(num == 100) {
flag = true;
}
if(flag) {
flag = false;
System.out.println("=====local1:" + num);
continue retry; //只是跳出 内层循环 相当于break
// break ; // 跳出 内层循环
// continue ; // 仅跳出本次循环
}
}
}
System.out.println("=====local end:"+num);
2、创建工作者源码分析
#作用 :创建并启动worker (线程池状态判断)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLoc
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == nul
if (t.isAlive()) // precheck that t
throw new IllegalThreadStateExc
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
通过加锁来添加worker
Worker(Runnable firstTask) {
setState(-1); // 线程还没运行,防止中断,工作者初始状态为-1
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
七、 核心代码runWorker
作用:
1、不停的执行初始任务和任务队列中的任务
2、getTask()方法说明
当线程池状态为SHUTDOWN且任务队列为空时,返回null
当线程池状态>=STOP时,返回nuill
当线程池状态< SHUTDOWN,则需要考虑是否减少临时线程数和核心线程数
核心线程数是否减少:参数允许核心线程数超时,且当前获取poll任务已经超时的情况
临时线程数是否减少:线程数wc > max线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
3、在过程中判断程池状态,当状态至少是STOP状态时,要设置工作者线程中断标记为TRUE
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 将工作者状态设置为0 允许被中断
boolean completedAbruptly = true;
try {
//第一次执行task里放的是firstTask,第二次执行task=null,要从队列中取出任务进行执行
while (task != null || (task = getTask()) != null) {
//拿到任务在执行期间,worker不能被中断
w.lock();
// 1、判断线程池状态是否是STOP状态(shutdownNow) ,是STOP状态则不往下执行
if ((runStateAtLeast(ctl.get(), STOP) ||
// 2、判断当前线程是否设置了中断标记位 如果设置了那么判断线程池状态是否是STOP状态
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
// 3、判断并重置(重置为false)当前worker线程的中断标记,如果未设置中断状态,则返回true
!wt.isInterrupted())
// 总结:线程池是STOP状态且当前工作线程未设置中断标记则需要设置一下 worker.Thread的中断状态为true
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
//这里的代码有删减
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 注意,这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
小结:completedAbruptly = true(异常退出) 的情况只会由用户引起,分别在 beforeExecute、afterExecute 两个勾子函数抛出异常时
@可能会遇到的面试题@
1、面试题 :线程池中如何获取线程中的异常
方法一:实现线程池中的afterExecute勾子函数
注意:这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略
方法二:在提交给线程池的任务中task中去捕获和处理异常
方法 三:通过Future.get()拿结果时捕获异常 (这里的实现没有理解 ??)
通过submit提交的任务,在Future.get()时可以捕获到异常
2、附加题:如何捕获线程池中的异常?
自定义ThreadFactory工厂
public class RewriteUncatchtExceptionHandler implements Thread.UncaughtExceptionHandler{
public void uncaughtException(Thread t, Throwable e) {
System.out.println("我捕获到了线程池的异常");
}
}
public class MyThreadFactory implements ThreadFactory{
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new RewriteUncatchtExceptionHandler());
System.out.println("Thread[" + t.getName() + "] created.");
return t;
}
}
/**
* 虽然从写ThreadFactory以后,可以捕获到异常,但是只能是execute,而submit还是不行 how to choose one
*/
public static void catchedExecutor() {
ExecutorService executorService = Executors.newCachedThreadPool(new MyThreadFactory());
executorService.execute(new Task());
executorService.shutdownNow();
}
how to choose submit() or execute() * There is a difference concerning exception/error handling.A task queued with execute() that generates some Throwable will cause the UncaughtExceptionHandler * for the Thread running the task to be invoked. The default UncaughtExceptionHandler, which typically prints the Throwable stack trace to System.err, will be * invoked if no custom handler has been installed.On the other hand, a Throwable generated by a task queued with submit() will bind the Throwable to the Future * that was produced from the call to submit(). Calling get() on that Future will throw an ExecutionException with the original Throwable as its cause (accessible * by calling getCause() on the ExecutionException).
上述的异常处理方式,只适用于execute,对于submit的方法提交的任务是无效的。那么我们如何进行选择呢?
意思就是说二者最大的区别就是异常处理上,在execute的时候,如果你没有实现一个handler,那么他就使用默认的handler来处理异常,你要是实现了一个handler
他就会使用的实例化的handler,但是对于submit来说,异常是绑定到Future上了,但是调用future.get()的时候,这些异常才会给你抛出(方法三中提到的实现) 来,意味着你自己定义的handler其实是无效的
八、processWorkerExit 函数源码分析
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//异常中断时,要减少工作线程数
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//清理工作者集合中的worker,累加完成任务数
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}