线程池学习笔记(一)ThreadPoolExecutor

  • Post author:
  • Post category:其他

一、 ThreadPoolExecutor线程池的状态

 RUNNING:  //运行状态 
           Accept new tasks and process queued tasks
 SHUTDOWN: //不接收新任务,但仍然可以运行任务队列中的任务
           Don't accept new tasks, but process queued tasks
 STOP:     //不接收新任务,也不运行任务队列中的任务,中断运行中的工作线程(worker.state>=0)
           Don't accept new tasks, don't process queued tasks
           and interrupt in-progress tasks
 TIDYING:  //所有任务都中止,工作线程数为0时,会将状态设置为TIDYING  详见 tryTerminate 方法
           All tasks have terminated, workerCount is zero,
           the thread transitioning to state TIDYING
           will run the terminated() hook method
 TERMINATED: // 调用terminated()方法后状态变为终止  terminated() has completed

二、如何优雅的关闭线程池

1、代码实例 使用shutdown 和 shutdownNow

 package com.springboot.girl.juc.pool;
 ​
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 ​
 /**
  * todo 如果我们把awaitTime和shutdownNow方法全部屏蔽掉的只留下shutdown方法的话会怎样呢?
  * 会变成表示main方法结束的「end」显示出来之后,会打印出很多的task2的start和end。
  * 这就是虽然课程结束了,但是学校仍然不能放学的不正常状态。最恶劣的情况会导致JAVA进程一直残留在OS中。
  * 所以我们一定不要忘记使用awaitTermination和shutdownNow
  * 1、shutdown 关闭提交通道,已提交的任务还是正常执行
  * 2、shutdownNow 关闭提交通道,已提交的任务不再执行
  * 3、awaitTermination 当前线程阻塞
  *  3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
     3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
     3.3 或者 线程被中断,抛出InterruptedException
  */
 public class TestPoolStatus {
     public static void main(String[] args) {
 ​
         ExecutorService pool = Executors.newFixedThreadPool(5);
         final long waitTime = 3 * 1000; //任务1的睡眠waitTime时间比shutdown的等待短,则任务1 可以正常执行完
         final long awaitTime = 5 * 1000;
 ​
         Runnable task1 = new Runnable(){
             public void run(){
                 try {
                     System.out.println("=====================task1 start=====================");
                     Thread.sleep(waitTime);
                     System.out.println("=====================task1 end==============");
                 } catch (InterruptedException e) {
                     System.out.println("task1 interrupted: " + e);
                 }
             }
         };
 ​
         Runnable task2 = new Runnable(){
             public void run(){
                 try {
                     System.out.println("  task2 start");
                     Thread.sleep(1000);
                     System.out.println("  task2 end");
                 } catch (InterruptedException e) {
                     System.out.println("task2 interrupted: " + e);
                 }
             }
         };
         // 让学生解答某个很难的问题
         pool.execute(task1);
 ​
         // 生学生解答很多问题
         for(int i=0; i<100; ++i){
             pool.execute(task2);
         }
 ​
         try {
             // 向学生传达“问题解答完毕后请举手示意!”
             pool.shutdown();
 ​
             // 向学生传达“XX分之内解答不完的问题全部带回去作为课后作业!”后老师等待学生答题
             // (所有的任务都结束的时候,返回TRUE)
             if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){
                 // 超时的时候向线程池中所有的线程发出中断(interrupted)。
                 pool.shutdownNow();
             }
         } catch (InterruptedException e) {
             // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
             System.out.println("awaitTermination interrupted: " + e);
             pool.shutdownNow();
         }
 ​
         System.out.println("end");
     }
 }
 ​

三、shutdown和shutdownNow的用法和区别

1、shutdown 关闭提交通道,已提交的任务还是正常执行

只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停

2、shutdownNow 关闭提交通道,已提交的任务不再执行

能立即停止线程池,正在跑的和正在等待的任务都停下了

3、awaitTermination 当前线程阻塞

3.1 等所有已提交的任务(包括正在跑的和队列中等待的)执行完 3.2 或者 等超时时间到了(timeout 和 TimeUnit设定的时间) 3.3 或者 线程被中断,抛出InterruptedException

四、submit和execute的区别?

1、可以接受的任务类型不同

execute只能接受Runnable类型的任务 submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null

2、submit()有返回值,而execute()没有

例如,有个task,希望该task执行完后告诉我它的执行结果,是成功还是失败,然后继续下面的操作,这时需要用submit

3、submit()可以进行Exception处理

例如,如果task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过对Future.get()进行抛出异常的捕获,然后对其进行处理。

4、来源不同

submit 来源ExecutorService接口

execute来源Executor接口

五、execute源码解析

# 作用:

1、初始化核心线程,同时将任务交给初始化的核心线程;

2、核心线程数满了之后,将任务缓存到workQueue中(判断是否能接收任务,与线程池的状态有关)

3、任务队列饱和的情况下,新增临时工作线程,将任务丢给临时线程处理

 public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
     // 第一步 当线程数量小于核心线程数时,初始化创建核心worker,并将任务交给核心worker处理
     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // 第二步 线程池状态是运行中,任务队列未满时,将任务添加到队列 中
     //检查线程池状态,如果不是运行中,则删除队列中的任务,
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         if (! isRunning(recheck) && remove(command))
             reject(command);
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     // 第三步 核心线程数已满且任务队列已经饱和的情况下,新增临时线程数,将任务丢给临时worker
     else if (!addWorker(command, false))
         reject(command);
 }

六、addWorker源码解析

#作用 (线程池状态判断)

1、修改工作线程数量

2、创建并启动worker

1、先改线程数量(修改ctl的数量 )

1.1 、SHUTDOWN状态不接收新任务,只完成线程池中的已有任务

注:shutdownNow状态(STOP)不接收新任务,也不处理线程池中的任务 返回false

 if (rs >= SHUTDOWN 
   &&
     ! (
         //线程池关闭状态
     rs == SHUTDOWN 
     &&
         //初始任务为空
     firstTask == null 
     &&
         //队列不为空的情况   
         //以上条件可以解读为:
         //只要关闭状态(true),来了个新任务(false) ,则不能创建worker
         //只要是关闭状态(true),新任务为空(true),但是队列不为空吗(true),则可以增加worker
              
     ! workQueue.isEmpty()
     )
 )
   return false;

1.2 自旋来修改数量

  for (;;) {
   int wc = workerCountOf(c);
   if (wc >= CAPACITY ||
       wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        
    if (compareAndIncrementWorkerCount(c))
       break retry;
       
     c = ctl.get();  // Re-read ctl
     if (runStateOf(c) != rs)
         continue retry;
                 // else CAS failed due to workerCount change; retry inner loop
      }

1.3 、补充知识:语句块在自旋中的使用 retry | break retry | continue retry

在for(;;)语句中如果 不使用retry定义语句块,只用break 只能跳出内层循环 | continue 只能跳出内层本次循环,程序 会一直执行下去不会停止,不信您可以把retry删除试试

 int num = 0;
 boolean flag = false;
 retry:
 for(;;) {
     System.out.println("=====local begin:"+num);
     for(;;) {
         num ++;
         if(num == 110) {
             break retry; //跳出到retry定义处的外层循环
         }
         if(num == 100) {
             flag = true;
         }
         if(flag) {
             flag = false;
             System.out.println("=====local1:" + num);
             continue retry;  //只是跳出 内层循环  相当于break
              //  break ; // 跳出 内层循环
             // continue ; // 仅跳出本次循环
         }
     }
 }
 System.out.println("=====local end:"+num);

2、创建工作者源码分析

#作用 :创建并启动worker (线程池状态判断)

 boolean workerStarted = false;
 boolean workerAdded = false;
 Worker w = null;
 try {
     w = new Worker(firstTask);
     final Thread t = w.thread;
     if (t != null) {
         final ReentrantLock mainLock = this.mainLoc
         mainLock.lock();
         try {
             // Recheck while holding lock.
             // Back out on ThreadFactory failure or
             // shut down before lock acquired.
             int rs = runStateOf(ctl.get());
             if (rs < SHUTDOWN ||
                 (rs == SHUTDOWN && firstTask == nul
                 if (t.isAlive()) // precheck that t
                     throw new IllegalThreadStateExc
                 workers.add(w);
                 int s = workers.size();
                 if (s > largestPoolSize)
                     largestPoolSize = s;
                 workerAdded = true;
             }
         } finally {
             mainLock.unlock();
         }
         if (workerAdded) {
             t.start();
             workerStarted = true;
         }
     }
 } finally {
     if (! workerStarted)
         addWorkerFailed(w);
 }
 return workerStarted;

通过加锁来添加worker

 Worker(Runnable firstTask) {
     setState(-1); // 线程还没运行,防止中断,工作者初始状态为-1
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this);
 }
 /** Delegates main run loop to outer runWorker  */
 public void run() {
     runWorker(this);
 }

七、 核心代码runWorker

作用:

1、不停的执行初始任务和任务队列中的任务

2、getTask()方法说明

当线程池状态为SHUTDOWN且任务队列为空时,返回null

当线程池状态>=STOP时,返回nuill

当线程池状态< SHUTDOWN,则需要考虑是否减少临时线程数和核心线程数

核心线程数是否减少:参数允许核心线程数超时,且当前获取poll任务已经超时的情况

临时线程数是否减少:线程数wc > max线程数

 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 if ((wc > maximumPoolSize || (timed && timedOut))
     && (wc > 1 || workQueue.isEmpty())) {
     if (compareAndDecrementWorkerCount(c))
         return null;
     continue;
 }
 try {
     Runnable r = timed ?
         workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
         workQueue.take();
     if (r != null)
         return r;
     timedOut = true;
 } catch (InterruptedException retry) {
     timedOut = false;
 }

3、在过程中判断程池状态,当状态至少是STOP状态时,要设置工作者线程中断标记为TRUE

 final void runWorker(Worker w) {
         Thread wt = Thread.currentThread();
         Runnable task = w.firstTask;
         w.firstTask = null;
         w.unlock(); // 将工作者状态设置为0  允许被中断
         boolean completedAbruptly = true;
         try {
             //第一次执行task里放的是firstTask,第二次执行task=null,要从队列中取出任务进行执行
             while (task != null || (task = getTask()) != null) {
                 //拿到任务在执行期间,worker不能被中断
                 w.lock();        
                 // 1、判断线程池状态是否是STOP状态(shutdownNow) ,是STOP状态则不往下执行
                 if ((runStateAtLeast(ctl.get(), STOP) ||
                  // 2、判断当前线程是否设置了中断标记位  如果设置了那么判断线程池状态是否是STOP状态
                      (Thread.interrupted() &&
                       runStateAtLeast(ctl.get(), STOP))) &&
              // 3、判断并重置(重置为false)当前worker线程的中断标记,如果未设置中断状态,则返回true
                     !wt.isInterrupted())
              // 总结:线程池是STOP状态且当前工作线程未设置中断标记则需要设置一下 worker.Thread的中断状态为true
                     wt.interrupt();
                 
                 try {
                     beforeExecute(wt, task);
                     Throwable thrown = null;
                     try {
                         task.run();
                         //这里的代码有删减
                     } catch (Throwable x) {
                         thrown = x; throw new Error(x);
                     } finally {
                         // 注意,这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略
                         afterExecute(task, thrown);
                     }
                 } finally {
                     task = null;
                     w.completedTasks++;
                     w.unlock();
                 }
             }
             completedAbruptly = false;
         } finally {
             processWorkerExit(w, completedAbruptly);
         }
     }

小结:completedAbruptly = true(异常退出) 的情况只会由用户引起,分别在 beforeExecute、afterExecute 两个勾子函数抛出异常时

@可能会遇到的面试题@

1、面试题 :线程池中如何获取线程中的异常

方法一:实现线程池中的afterExecute勾子函数

注意:这个勾子函数抛出的异常需要自己捕获处理,否则会被外面的try忽略

方法二:在提交给线程池的任务中task中去捕获和处理异常

方法 三:通过Future.get()拿结果时捕获异常 (这里的实现没有理解 ??)

通过submit提交的任务,在Future.get()时可以捕获到异常

2、附加题:如何捕获线程池中的异常?

自定义ThreadFactory工厂

 public class RewriteUncatchtExceptionHandler implements Thread.UncaughtExceptionHandler{
     public void uncaughtException(Thread t, Throwable e) {
         System.out.println("我捕获到了线程池的异常");
     }
 }

 public class MyThreadFactory implements ThreadFactory{
     public Thread newThread(Runnable r) {
         Thread t = new Thread(r);
         t.setUncaughtExceptionHandler(new RewriteUncatchtExceptionHandler());
         System.out.println("Thread[" + t.getName() + "] created.");
         return t;
     }
 }
 ​
 /**
    * 虽然从写ThreadFactory以后,可以捕获到异常,但是只能是execute,而submit还是不行  how to choose one
    */
   public static void catchedExecutor() {
       ExecutorService executorService = Executors.newCachedThreadPool(new MyThreadFactory());
       executorService.execute(new Task());
       executorService.shutdownNow();
   }
 how to choose submit() or execute()
  * There is a difference concerning exception/error handling.A task queued with execute() that generates some Throwable will cause the UncaughtExceptionHandler
  * for the Thread running the task to be invoked. The default UncaughtExceptionHandler, which typically prints the Throwable stack trace to System.err, will be
  * invoked if no custom handler has been installed.On the other hand, a Throwable generated by a task queued with submit() will bind the Throwable to the Future
  * that was produced from the call to submit(). Calling get() on that Future will throw an ExecutionException with the original Throwable as its cause (accessible
  * by calling getCause() on the ExecutionException).

上述的异常处理方式,只适用于execute,对于submit的方法提交的任务是无效的。那么我们如何进行选择呢?

意思就是说二者最大的区别就是异常处理上,在execute的时候,如果你没有实现一个handler,那么他就使用默认的handler来处理异常,你要是实现了一个handler

他就会使用的实例化的handler,但是对于submit来说,异常是绑定到Future上了,但是调用future.get()的时候,这些异常才会给你抛出(方法三中提到的实现) 来,意味着你自己定义的handler其实是无效的

八、processWorkerExit 函数源码分析

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     //异常中断时,要减少工作线程数
     if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
         decrementWorkerCount();
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     //清理工作者集合中的worker,累加完成任务数
     try {
         completedTaskCount += w.completedTasks;
         workers.remove(w);
     } finally {
         mainLock.unlock();
     }
     tryTerminate();
     int c = ctl.get();
     if (runStateLessThan(c, STOP)) {
         if (!completedAbruptly) {
             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
             if (min == 0 && ! workQueue.isEmpty())
                 min = 1;
             if (workerCountOf(c) >= min)
                 return; // replacement not needed
         }
         addWorker(null, false);
     }
 }

#ScheduledThreadPoolExecutor 面试题


版权声明:本文为hh1sdfsf56456原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。