一篇文章让你彻底搞懂定时线程池ScheduledThreadPoolExecutor(深度剖析)

  • Post author:
  • Post category:其他







前言




一、ScheduledThreadPoolExecutor

这是一个可以在指定

一定延迟时间后

或者

定时进行

任务调度执行的

线程池



可以看到他也是继承于

ThreadPoolExecutor

线程池并且实现了

ScheduledExecutorService

接口。

在这里插入图片描述




1、快速入门-常用方法使用案例

public class ScheduledThreadPoolExecutorExample {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);
        Task task = new Task("任务");
        System.out.println("Created : " + task.getName());
         executor.schedule(task, 2, TimeUnit.SECONDS);// 只执行一次
        // executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS); //任务+延迟
//        executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);//任延迟取最大值 稳定定时器

    }
}

class Task implements Runnable {
    private String name;

    public Task(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void run() {
        System.out.println("Executing : " + name + ", Current Seconds : " + new Date().getSeconds());
        try {
        	// 模拟处理业务需要5秒钟
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


schedule:延迟多长时间之后只执行一次;



schedule输出结果:

Created : 任务
Executing : 任务, Current Seconds : 43


scheduleWithFixedDelay:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行(这里是执行5秒+延迟2秒=7秒一次)

scheduleWithFixedDelay输出结果:

Created : 任务
Executing : 任务, Current Seconds : 57
Executing : 任务, Current Seconds : 4
Executing : 任务, Current Seconds : 11
Executing : 任务, Current Seconds : 18
Executing : 任务, Current Seconds : 25
Executing : 任务, Current Seconds : 32


scheduleAtFixedRate:延迟指定时间后执行一次,之后按照固定的时长周期执行(这里是5秒执行一次);

scheduleAtFixedRate输出结果:

Created : 任务
Executing : 任务, Current Seconds : 19
Executing : 任务, Current Seconds : 24
Executing : 任务, Current Seconds : 29



二、源码类图分析



2.1、ScheduledThreadPoolExecutor-构造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

这里构造方法其实还是调用ThreadPoolExecutor的构造方法,

不同点

的是这里的队列是用的

DelayedWorkQueue(延迟阻塞队列)




2.2、DelayedWorkQueue-延迟阻塞队列


DelayedWorkQueue

是ScheduledThreadPoolExecutor的一个静态内部类,实现了阻塞队列。

可以看到他是由数组实现,默认数组容量是16,并且支持扩容,每次扩容是50%。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

private static final int INITIAL_CAPACITY = 16;
// 数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];



2.3、ScheduledFutureTask-具有返回结果值的任务

ScheduledFutureTask是ScheduledThreadPoolExecutor的内部类。

实现于FutureTask 继承于RunnableScheduledFuture。

每个要执行的线程任务都会被转换为ScheduledFutureTask。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

		// 任务的序号
		private final long sequenceNumber;
		// 延迟时间
        private long time;
        // 重复任务标识,正数为重复任务,为0或者负数为非重复任务
        private final long period;

// 构造方法
// 对上面3个核心属性进行初始化赋值,并且调用了父类FutureTask的构造方法
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }



2.4、父类FutureTask的构造方法、属性

public FutureTask(Runnable runnable, V result) {
		// callable是一个带有返回值的线程
        this.callable = Executors.callable(runnable, result);
        // state状态-NEW为初始状态
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask内部有一个变量state用来表示任务的黄天,一开始状态为NEW;


所有状态如下:

private static final int NEW  		=0;//	初始状态
private static final int COMPLETING = 1; //执行中状态
private static final int NORMAL = 2 ; // 正常运行结束状态
private static final int EXCEPTIONAL = 3;// 运行中异常
private static final int CANCELLED = 4;// 任务被取消
private static final int INTERRUPTING= 5;// 任务正在被中断
private static final int INTERRUPTED = 6;//任务已经被中断


任务状态转换

初始化》执行中》正常执行结束
初始化》执行中》执行异常
初始化》任务取消
初始化》任务被中断中》被中断



三、核心方法源码分析

围绕上面3个核心方法的源码进行分析拆解。

  public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}



3.1、schedule方法-只执行一次任务

根据延迟时间和执行时间只执行一次任务。

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //  1、参数校验
        if (command == null || unit == null)
            throw new NullPointerException();
        // 2、任务转换
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        // 3、核心!添加任务延迟队列
        delayedExecute(t);
        return t;
    }

代码2这里只是把

command

类型转换成了

ScheduledFutureTask。


初始化时,period设置为0,代表只执行一次,不会重复执行;

ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

父类构造方法把runnable转换成了callable;

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }


代码3执行delayedExecute方法,源码如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 1、如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
    if (isShutdown())
        reject(task);
    else {
        // 2、与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
        super.getQueue().add(task);
        // 3、如果当前状态无法执行任务,则取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
        //4、和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
        //增加Worker,确保提交的任务能够被执行
            ensurePrestart();
    }

主要说下代码3,再次检查线程池状态是否被关闭,关闭则从延迟队列里删除刚才的任务,如果队列没有,可能已经在执行了,则调用task.cancel进行取消,

如果代码3的情况不满足,则增加Woker,确保至少一个线程在处理任务,


ensurePrestart代码如下

void ensurePrestart() {
		// 获取活跃线程数
        int wc = workerCountOf(ctl.get());
        // 小于核心线程则增加工人线程
        if (wc < corePoolSize)
            addWorker(null, true);
         // 如果初始化为0,则添加工人线程
        else if (wc == 0)
            addWorker(null, false);
    }


至此,schedule的方法主要做了几件事情


1、初始化队列、初始化线程池

2、任务先入队,然后在添加工人Woker去启动执行

3、线程池状态关闭情况下,则取消任务




3.2、schedule的run方法-任务执行

public void run() {
    // 1、是否周期性,就是判断period是否为0。
    boolean periodic = isPeriodic();
    // 2、检查任务是否可以被执行,不能被执行,则取消
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 3、如果非周期性任务直接调用run运行即可(调用schedule时)。
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 4、如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
    else if (ScheduledFutureTask.super.runAndReset()) {
	    // 设置time=time+period
        setNextRunTime();
        // 5、需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
        reExecutePeriodic(outerTask);
    }

代码1、判断是一次性任务还是可重复任务,代码如下

public boolean isPeriodic() {
            return period != 0;
        }

可以看到内部是通过period来判断的,schedule方法初始化时设置的为0,所以他是一次性的,这里返回false。

代码2、判断当前任务是否应该被取消,canRunInCurrentRunState代码如下。

boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

这里传递的periodic的值为false,所以isRunningOrShutdown的参数为executeExistingDelayedTasksAfterShutdown。

executeExistingDelayedTasksAfterShutdown的参数默认为true。

isRunningOrShutdown代码如下

final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

表示有其他线程调用了SHUTDOWN命令关闭了线程池后,当前任务还是要执行,否则为false,则当前任务要取消。

代码3、由于periodic为false,所以执行ScheduledFutureTask.super.run();这里调用了

父类的Future的run方法,代码如下:

public void run() {
		// 1、如果状态不是初始化,或者是初始化但是CAS设置当前任务持有者为当前线程失败,则直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
        	// 2、判断了状态是否被修改
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                	// 3、执行任务并且阻塞线程直到拿到返回值
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 4、执行异常则CAS更新state为运行异常
                    setException(ex);
                }
                // 5、cas更新状态为运行结束
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

这里说一下

代码5,执行成功则执行set方法修改状态,set代码如下。

protected void set(V v) {
		// 当前状态为new,则cas更新为执行中
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // 状态更新执行完成(这里没用CAS是因为只可能有一个线程会进来),所以他比CAS性能更好
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

代码4、既然有执行成功,就有执行失败的时候,通常当有4个线程多次提交到线程池,就会出现多个线程同时CAS从NEW更新到执行中,因为任务共享同一个状态值stat,执行失败,则执行setException代码如下

protected void setException(Throwable t) {
		// 当前状态为new,则cas更新为执行中
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            // 设置当前任务状态任务非正常结束
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


至此、schedule的run方法主要做了几件事情


1、检查任务是不是周期性任务

2、检查线程池状态,有问题就取消

3、schedule明显不是周期性任务,所以只执行一次,更新线程状态结束




3.3、scheduleWithFixedDelay-延迟+定时

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        // 1、参数校验
        if (command == null || unit == null)
            throw new NullPointerException();
        // 2、延迟时间不可小于=0,否则抛异常
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 3、任务类型转换,这里和Schedule不同的是period=-delay
        ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        // 4、添加任务到队列
        delayedExecute(t);
        return t;
    }

参数说明:

command:提交的任务
initialDelay:表示延迟多少时间后开始执行command
delay:表示执行任务后,需要延迟多少时间
unit:是initialDelay和delay的时间单位。

任务会一直重复执行,直到任务运行中抛出了异常,被取消了,或者关闭了线程池。

这个方法和schedule方法不同的地方就在于Schedule传的值是0,这里值为-period,period<0说明是可重复执行的任务。

delayedExecute方法和Schedule调用的是同一个,这里不再复述。


线程池会从队列里获取任务,然后调用ScheduledFutureTask的run方法,由于这里period<0,所以isPeriodic返回true,所以执行runAndReset代码如下

public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            // 进入这个if调用runAndReset
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }



3.4、runAndReset方法-重复执行任务

protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        // 主要这里和3.2的方法不一样,他是void,但是这里不会更新状态,只会判断状态返回结果
        return ran && s == NEW;
    }

该方法和FutureTask的run方法类似,只是任务正常执行完毕后,不会修改任务的状态,这样做是为了可以重复执行。

这里返回了状态判断结果,正常执行完毕会返回true,否则返回false。

如果返回true,则执行setNextRunTime方法设置该任务下一次执行时间。




3.6、setNextRunTime-设置该任务下一次执行时间

private void setNextRunTime() {
            long p = period;
            // fixed-rate任务
            if (p > 0)
                time += p;
            else // fixed-delay任务
                time = triggerTime(-p);
        }

这里p<0,所以是fixed-delay任务。然后设置time为当前时间加上-p的时间,也就是

延迟-p 时间后再次执行




3.7、triggerTime(long delay)-获取触发时间

如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。

否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。

long triggerTime(long delay) {
    /*
     * 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
     * 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
     */
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}



3.8、overflowFree(long delay)-队列溢出处理

/**
 * 主要就是有这么一种情况:
 * 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
 * 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
 *
 * 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
 *
 * 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
 * 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
 * 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
 */
private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            long headDelay = head.getDelay(NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }

在设置好下一次执行时间后,调用reExecutePeriodic方法重复执行,代码如下




3.9、reExecutePeriodic-重新执行

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
		// 1、校验线程池运行状态,正常则添加任务到队列
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            // 2、如果状态已关闭或停止,则从队列里移除任务,如果移除成功,则取消任务
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
            	// 3、添加工人线程
                ensurePrestart();
        }
    }

每次重新执行,如果线程池状态正常,则往队列里添加任务,然后添加工人线程去处理。

至此、scheduleWithFixedDelay总结下

1、当添加一个任务到延迟队列后,等待initialDelay时间,任务就会过期,过期的任务就会被从队列汇总移除,并执行。

2、执行完毕后,会重新设置任务的延迟时间,然后在把任务放入延迟队列,循环往复。

3、如果一个任务在执行中异常了,那么这个任务就结束了,但是不影响其他线程任务的执行。

执行规则是,延迟时间+任务执行时间,为下一次循环执行的时间。




4.0、scheduleAtFixedRate-任务延迟取最大值-稳定定时器

以固定频率调用指定任务的线程池。

scheduleAtFixedRate和scheduleAtFixedDelay类似,这里主要讨论不同点,相同点可以回头在看下上面。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        // 任务转换,这里和fixed-delay不同,这里传的是正数period
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

由于period是正数,所以在执行setNextRunTime的时候,他是直接time+= p。而不是time = triggerTime(-p);

runAndReset执行完后,执行setNextRunTime这里走的就是p>0逻辑;

private void setNextRunTime() {
            long p = period;
            // fixed-rate任务
            if (p > 0)
                time += p;
            else // fixed-delay任务
                time = triggerTime(-p);
        }

scheduleAtFixedRate总结:

相对于fixed-delay任务来说,fixed-rate是以最大的延迟时间来重复执行。

比如任务处理要3秒,延迟5秒,那就是每5秒一次执行;

比如任务执行要5秒,延迟3秒,到达延迟时间后,则不会并发执行新的任务,而是等到第5秒后,下次要执行的任务才会延迟执行。




总结

常用的延迟任务是rate和delay2种,任务类型使用period区分。

提交任务,先提交到延迟队列,然后时间到了,从队列移除,核心线程获取任务并且执行。

在这里插入图片描述



版权声明:本文为qq_33522097原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。