前言
一、ScheduledThreadPoolExecutor
这是一个可以在指定
一定延迟时间后
或者
定时进行
任务调度执行的
线程池
。
可以看到他也是继承于
ThreadPoolExecutor
线程池并且实现了
ScheduledExecutorService
接口。
1、快速入门-常用方法使用案例
public class ScheduledThreadPoolExecutorExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);
Task task = new Task("任务");
System.out.println("Created : " + task.getName());
executor.schedule(task, 2, TimeUnit.SECONDS);// 只执行一次
// executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS); //任务+延迟
// executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);//任延迟取最大值 稳定定时器
}
}
class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void run() {
System.out.println("Executing : " + name + ", Current Seconds : " + new Date().getSeconds());
try {
// 模拟处理业务需要5秒钟
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
schedule:延迟多长时间之后只执行一次;
schedule输出结果:
Created : 任务
Executing : 任务, Current Seconds : 43
scheduleWithFixedDelay:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行(这里是执行5秒+延迟2秒=7秒一次)
scheduleWithFixedDelay输出结果:
Created : 任务
Executing : 任务, Current Seconds : 57
Executing : 任务, Current Seconds : 4
Executing : 任务, Current Seconds : 11
Executing : 任务, Current Seconds : 18
Executing : 任务, Current Seconds : 25
Executing : 任务, Current Seconds : 32
scheduleAtFixedRate:延迟指定时间后执行一次,之后按照固定的时长周期执行(这里是5秒执行一次);
scheduleAtFixedRate输出结果:
Created : 任务
Executing : 任务, Current Seconds : 19
Executing : 任务, Current Seconds : 24
Executing : 任务, Current Seconds : 29
二、源码类图分析
2.1、ScheduledThreadPoolExecutor-构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
这里构造方法其实还是调用ThreadPoolExecutor的构造方法,
不同点
的是这里的队列是用的
DelayedWorkQueue(延迟阻塞队列)
。
2.2、DelayedWorkQueue-延迟阻塞队列
DelayedWorkQueue
是ScheduledThreadPoolExecutor的一个静态内部类,实现了阻塞队列。
可以看到他是由数组实现,默认数组容量是16,并且支持扩容,每次扩容是50%。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
// 数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
2.3、ScheduledFutureTask-具有返回结果值的任务
ScheduledFutureTask是ScheduledThreadPoolExecutor的内部类。
实现于FutureTask 继承于RunnableScheduledFuture。
每个要执行的线程任务都会被转换为ScheduledFutureTask。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 任务的序号
private final long sequenceNumber;
// 延迟时间
private long time;
// 重复任务标识,正数为重复任务,为0或者负数为非重复任务
private final long period;
// 构造方法
// 对上面3个核心属性进行初始化赋值,并且调用了父类FutureTask的构造方法
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
2.4、父类FutureTask的构造方法、属性
public FutureTask(Runnable runnable, V result) {
// callable是一个带有返回值的线程
this.callable = Executors.callable(runnable, result);
// state状态-NEW为初始状态
this.state = NEW; // ensure visibility of callable
}
FutureTask内部有一个变量state用来表示任务的黄天,一开始状态为NEW;
所有状态如下:
private static final int NEW =0;// 初始状态
private static final int COMPLETING = 1; //执行中状态
private static final int NORMAL = 2 ; // 正常运行结束状态
private static final int EXCEPTIONAL = 3;// 运行中异常
private static final int CANCELLED = 4;// 任务被取消
private static final int INTERRUPTING= 5;// 任务正在被中断
private static final int INTERRUPTED = 6;//任务已经被中断
任务状态转换
初始化》执行中》正常执行结束
初始化》执行中》执行异常
初始化》任务取消
初始化》任务被中断中》被中断
三、核心方法源码分析
围绕上面3个核心方法的源码进行分析拆解。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
3.1、schedule方法-只执行一次任务
根据延迟时间和执行时间只执行一次任务。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 1、参数校验
if (command == null || unit == null)
throw new NullPointerException();
// 2、任务转换
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 3、核心!添加任务延迟队列
delayedExecute(t);
return t;
}
代码2这里只是把
command
类型转换成了
ScheduledFutureTask。
初始化时,period设置为0,代表只执行一次,不会重复执行;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
父类构造方法把runnable转换成了callable;
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
代码3执行delayedExecute方法,源码如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 1、如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
// 2、与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
super.getQueue().add(task);
// 3、如果当前状态无法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//4、和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
//增加Worker,确保提交的任务能够被执行
ensurePrestart();
}
主要说下代码3,再次检查线程池状态是否被关闭,关闭则从延迟队列里删除刚才的任务,如果队列没有,可能已经在执行了,则调用task.cancel进行取消,
如果代码3的情况不满足,则增加Woker,确保至少一个线程在处理任务,
ensurePrestart代码如下
void ensurePrestart() {
// 获取活跃线程数
int wc = workerCountOf(ctl.get());
// 小于核心线程则增加工人线程
if (wc < corePoolSize)
addWorker(null, true);
// 如果初始化为0,则添加工人线程
else if (wc == 0)
addWorker(null, false);
}
至此,schedule的方法主要做了几件事情
1、初始化队列、初始化线程池
2、任务先入队,然后在添加工人Woker去启动执行
3、线程池状态关闭情况下,则取消任务
3.2、schedule的run方法-任务执行
public void run() {
// 1、是否周期性,就是判断period是否为0。
boolean periodic = isPeriodic();
// 2、检查任务是否可以被执行,不能被执行,则取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 3、如果非周期性任务直接调用run运行即可(调用schedule时)。
else if (!periodic)
ScheduledFutureTask.super.run();
// 4、如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置time=time+period
setNextRunTime();
// 5、需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
reExecutePeriodic(outerTask);
}
代码1、判断是一次性任务还是可重复任务,代码如下
public boolean isPeriodic() {
return period != 0;
}
可以看到内部是通过period来判断的,schedule方法初始化时设置的为0,所以他是一次性的,这里返回false。
代码2、判断当前任务是否应该被取消,canRunInCurrentRunState代码如下。
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
这里传递的periodic的值为false,所以isRunningOrShutdown的参数为executeExistingDelayedTasksAfterShutdown。
executeExistingDelayedTasksAfterShutdown的参数默认为true。
isRunningOrShutdown代码如下
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
表示有其他线程调用了SHUTDOWN命令关闭了线程池后,当前任务还是要执行,否则为false,则当前任务要取消。
代码3、由于periodic为false,所以执行ScheduledFutureTask.super.run();这里调用了
父类的Future的run方法,代码如下:
public void run() {
// 1、如果状态不是初始化,或者是初始化但是CAS设置当前任务持有者为当前线程失败,则直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 2、判断了状态是否被修改
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 3、执行任务并且阻塞线程直到拿到返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 4、执行异常则CAS更新state为运行异常
setException(ex);
}
// 5、cas更新状态为运行结束
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这里说一下
代码5,执行成功则执行set方法修改状态,set代码如下。
protected void set(V v) {
// 当前状态为new,则cas更新为执行中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 状态更新执行完成(这里没用CAS是因为只可能有一个线程会进来),所以他比CAS性能更好
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
代码4、既然有执行成功,就有执行失败的时候,通常当有4个线程多次提交到线程池,就会出现多个线程同时CAS从NEW更新到执行中,因为任务共享同一个状态值stat,执行失败,则执行setException代码如下
protected void setException(Throwable t) {
// 当前状态为new,则cas更新为执行中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 设置当前任务状态任务非正常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
至此、schedule的run方法主要做了几件事情
1、检查任务是不是周期性任务
2、检查线程池状态,有问题就取消
3、schedule明显不是周期性任务,所以只执行一次,更新线程状态结束
3.3、scheduleWithFixedDelay-延迟+定时
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// 1、参数校验
if (command == null || unit == null)
throw new NullPointerException();
// 2、延迟时间不可小于=0,否则抛异常
if (delay <= 0)
throw new IllegalArgumentException();
// 3、任务类型转换,这里和Schedule不同的是period=-delay
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 4、添加任务到队列
delayedExecute(t);
return t;
}
参数说明:
command:提交的任务
initialDelay:表示延迟多少时间后开始执行command
delay:表示执行任务后,需要延迟多少时间
unit:是initialDelay和delay的时间单位。
任务会一直重复执行,直到任务运行中抛出了异常,被取消了,或者关闭了线程池。
这个方法和schedule方法不同的地方就在于Schedule传的值是0,这里值为-period,period<0说明是可重复执行的任务。
delayedExecute方法和Schedule调用的是同一个,这里不再复述。
线程池会从队列里获取任务,然后调用ScheduledFutureTask的run方法,由于这里period<0,所以isPeriodic返回true,所以执行runAndReset代码如下
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
// 进入这个if调用runAndReset
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
3.4、runAndReset方法-重复执行任务
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 主要这里和3.2的方法不一样,他是void,但是这里不会更新状态,只会判断状态返回结果
return ran && s == NEW;
}
该方法和FutureTask的run方法类似,只是任务正常执行完毕后,不会修改任务的状态,这样做是为了可以重复执行。
这里返回了状态判断结果,正常执行完毕会返回true,否则返回false。
如果返回true,则执行setNextRunTime方法设置该任务下一次执行时间。
3.6、setNextRunTime-设置该任务下一次执行时间
private void setNextRunTime() {
long p = period;
// fixed-rate任务
if (p > 0)
time += p;
else // fixed-delay任务
time = triggerTime(-p);
}
这里p<0,所以是fixed-delay任务。然后设置time为当前时间加上-p的时间,也就是
延迟-p 时间后再次执行
。
3.7、triggerTime(long delay)-获取触发时间
如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
long triggerTime(long delay) {
/*
* 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
* 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
*/
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
3.8、overflowFree(long delay)-队列溢出处理
/**
* 主要就是有这么一种情况:
* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
*
* 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
*
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
在设置好下一次执行时间后,调用reExecutePeriodic方法重复执行,代码如下
3.9、reExecutePeriodic-重新执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 1、校验线程池运行状态,正常则添加任务到队列
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
// 2、如果状态已关闭或停止,则从队列里移除任务,如果移除成功,则取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 3、添加工人线程
ensurePrestart();
}
}
每次重新执行,如果线程池状态正常,则往队列里添加任务,然后添加工人线程去处理。
至此、scheduleWithFixedDelay总结下
1、当添加一个任务到延迟队列后,等待initialDelay时间,任务就会过期,过期的任务就会被从队列汇总移除,并执行。
2、执行完毕后,会重新设置任务的延迟时间,然后在把任务放入延迟队列,循环往复。
3、如果一个任务在执行中异常了,那么这个任务就结束了,但是不影响其他线程任务的执行。
执行规则是,延迟时间+任务执行时间,为下一次循环执行的时间。
4.0、scheduleAtFixedRate-任务延迟取最大值-稳定定时器
以固定频率调用指定任务的线程池。
scheduleAtFixedRate和scheduleAtFixedDelay类似,这里主要讨论不同点,相同点可以回头在看下上面。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任务转换,这里和fixed-delay不同,这里传的是正数period
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
由于period是正数,所以在执行setNextRunTime的时候,他是直接time+= p。而不是time = triggerTime(-p);
runAndReset执行完后,执行setNextRunTime这里走的就是p>0逻辑;
private void setNextRunTime() {
long p = period;
// fixed-rate任务
if (p > 0)
time += p;
else // fixed-delay任务
time = triggerTime(-p);
}
scheduleAtFixedRate总结:
相对于fixed-delay任务来说,fixed-rate是以最大的延迟时间来重复执行。
比如任务处理要3秒,延迟5秒,那就是每5秒一次执行;
比如任务执行要5秒,延迟3秒,到达延迟时间后,则不会并发执行新的任务,而是等到第5秒后,下次要执行的任务才会延迟执行。
总结
常用的延迟任务是rate和delay2种,任务类型使用period区分。
提交任务,先提交到延迟队列,然后时间到了,从队列移除,核心线程获取任务并且执行。