一、 ThreadPoolExecutor线程池的状态
RUNNING: //运行状态 Accept new tasks and process queued tasks SHUTDOWN: //不接收新任务,但仍然可以运行任务队列中的任务 Don't accept new tasks, but process queued tasks STOP: //不接收新任务,也不运行任务队列中的任务,中断运行中的工作线程(worker.state>=0) Don't accept new tasks, don't process queued tasks and interrupt in-progress tasks TIDYING: //所有任务都中止,工作线程数为0时,会将状态设置为TIDYING 详见 tryTerminate 方法 All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: // 调用terminated()方法后状态变为终止 terminated() has completed
二、如何优雅的关闭线程池
1、代码实例 使用shutdown 和 shutdownNow
package com.springboot.girl.juc.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * todo 如果我们把awaitTime和shutdownNow方法全部屏蔽掉的只留下shutdown方法的话会怎样呢? * 会变成表示main方法结束的「end」显示出来之后,会打印出很多的task2的start和end。 * 这就是虽然课程结束了,但是学校仍然不能放学的不正常状态。最恶劣的情况会导致JAVA进程一直残留在OS中。 * 所以我们一定不要忘记使用awaitTermination和shutdownNow * 1、shutdown 关闭提交通道,已提交的任务还是正常执行 * 2、shutdownNow 关闭提交通道,已提交的任务不再执行 * 3、awaitTermination 当前线程阻塞 * 3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完; 3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间); 3.3 或者 线程被中断,抛出InterruptedException */ public class TestPoolStatus { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5); final long waitTime = 3 * 1000; //任务1的睡眠waitTime时间比shutdown的等待短,则任务1 可以正常执行完 final long awaitTime = 5 * 1000; Runnable task1 = new Runnable(){ public void run(){ try { System.out.println("=====================task1 start====================="); Thread.sleep(waitTime); System.out.println("=====================task1 end=============="); } catch (InterruptedException e) { System.out.println("task1 interrupted: " + e); } } }; Runnable task2 = new Runnable(){ public void run(){ try { System.out.println(" task2 start"); Thread.sleep(1000); System.out.println(" task2 end"); } catch (InterruptedException e) { System.out.println("task2 interrupted: " + e); } } }; // 让学生解答某个很难的问题 pool.execute(task1); // 生学生解答很多问题 for(int i=0; i<100; ++i){ pool.execute(task2); } try { // 向学生传达“问题解答完毕后请举手示意!” pool.shutdown(); // 向学生传达“XX分之内解答不完的问题全部带回去作为课后作业!”后老师等待学生答题 // (所有的任务都结束的时候,返回TRUE) if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){ // 超时的时候向线程池中所有的线程发出中断(interrupted)。 pool.shutdownNow(); } } catch (InterruptedException e) { // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。 System.out.println("awaitTermination interrupted: " + e); pool.shutdownNow(); } System.out.println("end"); } }
三、shutdown和shutdownNow的用法和区别
1、shutdown 关闭提交通道,已提交的任务还是正常执行
只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停
2、shutdownNow 关闭提交通道,已提交的任务不再执行
能立即停止线程池,正在跑的和正在等待的任务都停下了
3、awaitTermination 当前线程阻塞
3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完 3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间) 3.3 或者 线程被中断,抛出InterruptedException
四、submit和execute的区别?
1、可以接受的任务类型不同
execute只能接受Runnable类型的任务 submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null
2、submit()有返回值,而execute()没有
例如,有个task,希望该task执行完后告诉我它的执行结果,是成功还是失败,然后继续下面的操作,这时需要用submit
3、submit()可以进行Exception处理
例如,如果task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过对Future.get()进行抛出异常的捕获,然后对其进行处理。
4、来源不同
submit 来源ExecutorService接口
execute来源Executor接口
五、execute源码解析
# 作用:
1、初始化核心线程,同时将任务交给初始化的核心线程;
2、核心线程数满了之后,将任务缓存到workQueue中(判断是否能接收任务,与线程池的状态有关)
3、任务队列饱和的情况下,新增临时工作线程,将任务丢给临时线程处理
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 第一步 当线程数量小于核心线程数时,初始化创建核心worker,并将任务交给核心worker处理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 第二步 线程池状态是运行中,任务队列未满时,将任务添加到队列 中 //检查线程池状态,如果不是运行中,则删除队列中的任务, if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 第三步 核心线程数已满且任务队列已经饱和的情况下,新增临时线程数,将任务丢给临时worker else if (!addWorker(command, false)) reject(command); }
六、addWorker源码解析
#作用 (线程池状态判断)
1、修改工作线程数量
2、创建并启动worker
1、先改线程数量(修改ctl的数量 )
1.1 、SHUTDOWN状态不接收新任务,只完成线程池中的已有任务
注:shutdownNow状态(STOP)不接收新任务,也不处理线程池中的任务 返回false
if (rs >= SHUTDOWN && ! ( //线程池关闭状态 rs == SHUTDOWN && //初始任务为空 firstTask == null && //队列不为空的情况 //以上条件可以解读为: //只要关闭状态(true),来了个新任务(false) ,则不能创建worker //只要是关闭状态(true),新任务为空(true),但是队列不为空吗(true),则可以增加worker ! workQueue.isEmpty() ) ) return false;
1.2 自旋来修改数量
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }
1.3 、补充知识:语句块在自旋中的使用 retry | break retry | continue retry
在for(;;)语句中如果 不使用retry定义语句块,只用break 只能跳出内层循环 | continue 只能跳出内层本次循环,程序 会一直执行下去不会停止,不信您可以把retry删除试试
int num = 0; boolean flag = false; retry: for(;;) { System.out.println("=====local begin:"+num); for(;;) { num ++; if(num == 110) { break retry; //跳出到retry定义处的外层循环 } if(num == 100) { flag = true; } if(flag) { flag = false; System.out.println("=====local1:" + num); continue retry; //只是跳出 内层循环 相当于break // break ; // 跳出 内层循环 // continue ; // 仅跳出本次循环 } } } System.out.println("=====local end:"+num);
2、创建工作者源码分析
#作用 :创建并启动worker (线程池状态判断)
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLoc mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == nul if (t.isAlive()) // precheck that t throw new IllegalThreadStateExc workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
通过加锁来添加worker
Worker(Runnable firstTask) { setState(-1); // 线程还没运行,防止中断,工作者初始状态为-1 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
七、 核心代码runWorker
作用:
1、不停的执行初始任务和任务队列中的任务
2、getTask()方法说明
当线程池状态为SHUTDOWN且任务队列为空时,返回null
当线程池状态>=STOP时,返回nuill
当线程池状态< SHUTDOWN,则需要考虑是否减少临时线程数和核心线程数
核心线程数是否减少:参数允许核心线程数超时,且当前获取poll任务已经超时的情况
临时线程数是否减少:线程数wc > max线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
3、在过程中判断程池状态,当状态至少是STOP状态时,要设置工作者线程中断标记为TRUE
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 将工作者状态设置为0 允许被中断 boolean completedAbruptly = true; try { //第一次执行task里放的是firstTask,第二次执行task=null,要从队列中取出任务进行执行 while (task != null || (task = getTask()) != null) { //拿到任务在执行期间,worker不能被中断 w.lock(); // 1、判断线程池状态是否是STOP状态(shutdownNow) ,是STOP状态则不往下执行 if ((runStateAtLeast(ctl.get(), STOP) || // 2、判断当前线程是否设置了中断标记位 如果设置了那么判断线程池状态是否是STOP状态 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && // 3、判断并重置(重置为false)当前worker线程的中断标记,如果未设置中断状态,则返回true !wt.isInterrupted()) // 总结:线程池是STOP状态且当前工作线程未设置中断标记则需要设置一下 worker.Thread的中断状态为true wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //这里的代码有删减 } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 注意,这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
小结:completedAbruptly = true(异常退出) 的情况只会由用户引起,分别在 beforeExecute、afterExecute 两个勾子函数抛出异常时
@可能会遇到的面试题@
1、面试题 :线程池中如何获取线程中的异常
方法一:实现线程池中的afterExecute勾子函数
注意:这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略
方法二:在提交给线程池的任务中task中去捕获和处理异常
方法 三:通过Future.get()拿结果时捕获异常 (这里的实现没有理解 ??)
通过submit提交的任务,在Future.get()时可以捕获到异常
2、附加题:如何捕获线程池中的异常?
自定义ThreadFactory工厂
public class RewriteUncatchtExceptionHandler implements Thread.UncaughtExceptionHandler{ public void uncaughtException(Thread t, Throwable e) { System.out.println("我捕获到了线程池的异常"); } }
public class MyThreadFactory implements ThreadFactory{ public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler(new RewriteUncatchtExceptionHandler()); System.out.println("Thread[" + t.getName() + "] created."); return t; } } /** * 虽然从写ThreadFactory以后,可以捕获到异常,但是只能是execute,而submit还是不行 how to choose one */ public static void catchedExecutor() { ExecutorService executorService = Executors.newCachedThreadPool(new MyThreadFactory()); executorService.execute(new Task()); executorService.shutdownNow(); }
how to choose submit() or execute() * There is a difference concerning exception/error handling.A task queued with execute() that generates some Throwable will cause the UncaughtExceptionHandler * for the Thread running the task to be invoked. The default UncaughtExceptionHandler, which typically prints the Throwable stack trace to System.err, will be * invoked if no custom handler has been installed.On the other hand, a Throwable generated by a task queued with submit() will bind the Throwable to the Future * that was produced from the call to submit(). Calling get() on that Future will throw an ExecutionException with the original Throwable as its cause (accessible * by calling getCause() on the ExecutionException).
上述的异常处理方式,只适用于execute,对于submit的方法提交的任务是无效的。那么我们如何进行选择呢?
意思就是说二者最大的区别就是异常处理上,在execute的时候,如果你没有实现一个handler,那么他就使用默认的handler来处理异常,你要是实现了一个handler
他就会使用的实例化的handler,但是对于submit来说,异常是绑定到Future上了,但是调用future.get()的时候,这些异常才会给你抛出(方法三中提到的实现) 来,意味着你自己定义的handler其实是无效的
八、processWorkerExit 函数源码分析
private void processWorkerExit(Worker w, boolean completedAbruptly) { //异常中断时,要减少工作线程数 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //清理工作者集合中的worker,累加完成任务数 try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }