前言
ScheduledThreadPoolExecutor可以用来执行一些定时调度任务,可以按照指定的周期执行任务
一、创建线程池
ThreadPoolTaskScheduler内部包装了一个ScheduledThreadPoolExecutor
@Configuration
public class ScheduleConfiguration {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 核心线程数
int processors = Runtime.getRuntime().availableProcessors();
scheduler.setPoolSize(processors);
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}
}
二、测试schedule等方法
1.测试代码
通过cron表达式设置每隔30s执行一次
@RunWith(SpringRunner.class)
@SpringBootTest(classes = BizApp.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ScheduleThreadPoolTest {
@Autowired
private TaskScheduler taskScheduler;
//30秒执行一次
private String cron = "0/30 * * * * ?";
@Test
public void testSchedule(){
Runnable runnable = () -> System.out.println("执行一次定时任务");
CronTrigger trigger = new CronTrigger(cron);
ScheduledFuture<?> future = taskScheduler.schedule(runnable, trigger);
}
2.原理分析
(1)首先调用taskScheduler.schedule(runnable, trigger);,里面创建了ReschedulingRunnable,然后调用schedule()方法
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {
errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
//将任务封装到ReschedulingRunnable,并执行schedule方法
return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
(2)ReschedulingRunnable的schedule方法如下,调用.executor的schedule方法,其中executor就是ScheduledThreadPoolExecutor
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
//通过线程池执行调度
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
(3)然后进入ScheduledThreadPoolExecutor的schedule方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
(4)再进入delayedExecute(t) 方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//加入队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
其中getQueue获取到调度线程池的延时队列DelayedWorkQueue,将任务加入队列,加入队列的逻辑:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//扩容
grow();
size = i + 1;
//根据调度延时时间找到合适的位置
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
//如果当前加入到队列第一个位置,将leader设置为null
leader = null;
//唤醒等待的线程(可能所有核心线程都在等待,需要唤醒一个去执行任务)
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
另外 ensurePrestart()是有任务时,保证有线程在执行
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//小于核心线程,新增一个线程
if (wc < corePoolSize)
addWorker(null, true);
//当前工作线程为0,新增一个线程
else if (wc == 0)
addWorker(null, false);
}
(5)工作线程起来之后,从循环队列里获取任务,执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//校验线程池状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前的回调
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行后的回调
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
(6)获取任务task = getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
workQueue.take();//阻塞获取任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
workQueue 是ScheduledThreadPoolExecutor的内部类DelayedWorkQueue
其中的take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
//1、队列为空时,等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//2、不需要等待了,返回任务
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
//3、不是leader线程,等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//4、延时执行,等待
available.awaitNanos(delay);
} finally {
//5、将leader设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
//6、唤醒其它线程
available.signal();
lock.unlock();
}
}
该方法是获取任务的核心方法:
1、if (first == null),当队列为空时,所有线程进入等待;当有任务加入到队列里面时,会有线程从此处唤醒;
2、队列不为空时,获取延时执行时间,若延时时间小于等于0,表示不需要等待了(包括已经等待过了;或者任务执行较慢,目前时间已经超过了预定的执行时间),此时直接返回第一个任务;
3、若延时时间大于0,说明需要等待delay,如果此时leader不为null,说明已经有leader了,让leader线程执行第4处等待就行了,当前线程在第3处等待;
4、如果leader为null,则将当前线程赋给leader,让leader线程执行第4处等待;
5、leader线程等待时间到达后,可能有多个线程作为leader在等待,需要判断leader是当前线程时,重新将leader置为null;然后该线程继续循环获取任务,(当然取走任务的线程不一定是等待delay的leader线程,可能在leader等待时间到达后,其它线程占用锁,leader并没有被唤醒,此时任务的delay已经小于0,任务可能会被其它占有锁的线程取走)
6、取走任务的线程,会在第6处判断leader时null,并且队列里还有任务,会尝试唤醒一个第1处或第3处等待的线程继续循环取任务。
(7)、执行任务 task.run();
task是ScheduledFutureTask对象,它的run方法如下
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//schedule 任务
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//scheduleWithFixedDelay、scheduleAtFixedRate 任务
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
(8)、 ScheduledFutureTask.super.run(); 既是FutureTask 的 run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//任务被封装在Callable里面了
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
(9)、再继续进入到ReschedulingRunnable的run方法
public void run() {
//任务真实执行时间
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
//执行任务
super.run();
//任务完成时间
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
//调度下次任务
schedule();
}
}
}
(10)、调度下次任务 schedule();
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//根据上次完成时间计算下次调度时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
//将新任务加入线程池ScheduledThreadPoolExecutor
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
从第9、10步可以看出,在调度过程中,维护了3个时间:
scheduledExecutionTime:任务预定的执行时间,根据上次完成时间计算,所以当任务执行周期大于调度周期时,后续任务都会延后;
actualExecutionTime:任务实际的执行时间,执行任务前记录;
completionTime:任务实际的完成时间,执行任务后记录。
(11)、下面接着分析第7步中的 scheduleWithFixedDelay、scheduleAtFixedRate 任务
首先创建任务:
@Test
public void testScheduleAtFixedRate(){
Runnable runnable = () -> System.out.println("执行一次定时任务");
//30秒执行一次
ScheduledFuture<?> future = taskScheduler.scheduleAtFixedRate(runnable,30000 );
}
(12)、ScheduledFutureTask.super.runAndReset() 既是既是FutureTask 的 runAndReset方法
执行任务
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
//执行任务
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
(13)、 设置下次执行时间 setNextRunTime();
private void setNextRunTime() {
long p = period;
if (p > 0)
//scheduleWithFixedDelay 将上次时间加上周期
time += p;
else
//scheduleAtFixedRate 将当前时间加上周期
time = triggerTime(-p);
}
(14)、重复执行周期任务 reExecutePeriodic(outerTask);
将任务加入队列 DelayedWorkQueue
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//加入队列
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
可以看出:
scheduleAtFixedRate,是以固定周期执行任务,也是在上次任务执行完时创建新的周期任务;当有时任务执行较慢时,线程池会延后执行后续任务。
scheduleWithFixedDelay,是以固定延后频率执行任务。
总结
1、schedule :当任务执行周期大于调度周期时,后续任务都会延后;
2、scheduleAtFixedRate : 当任务执行周期大于调度周期时,后续任务都会延后;当后面任务有变得执行较快时,线程池会将少执行的任务再补充回来,以达到相对固定的频率执行任务;
3、scheduleWithFixedDelay: 在上次任务完成后,以当前时间加上周期作为下次执行的时间,即以上次任务完成为基准,延后固定周期执行任务。