java 调度线程池ScheduledThreadPoolExecutor的使用和源码学习

  • Post author:
  • Post category:java







前言


ScheduledThreadPoolExecutor可以用来执行一些定时调度任务,可以按照指定的周期执行任务





一、创建线程池


ThreadPoolTaskScheduler内部包装了一个ScheduledThreadPoolExecutor

@Configuration
public class ScheduleConfiguration {

    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        // 核心线程数
        int processors = Runtime.getRuntime().availableProcessors();
        scheduler.setPoolSize(processors);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }
}



二、测试schedule等方法



1.测试代码

通过cron表达式设置每隔30s执行一次

@RunWith(SpringRunner.class)
@SpringBootTest(classes = BizApp.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ScheduleThreadPoolTest {


    @Autowired
    private TaskScheduler taskScheduler;

    //30秒执行一次
    private String cron = "0/30 * * * * ?";

    @Test
    public void testSchedule(){
        Runnable runnable = () -> System.out.println("执行一次定时任务");
        CronTrigger trigger = new CronTrigger(cron);
        ScheduledFuture<?> future = taskScheduler.schedule(runnable, trigger);
    }



2.原理分析

(1)首先调用taskScheduler.schedule(runnable, trigger);,里面创建了ReschedulingRunnable,然后调用schedule()方法

public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
		ScheduledExecutorService executor = getScheduledExecutor();
		try {
			ErrorHandler errorHandler = this.errorHandler;
			if (errorHandler == null) {
				errorHandler = TaskUtils.getDefaultErrorHandler(true);
			}
			//将任务封装到ReschedulingRunnable,并执行schedule方法
			return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule();
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

(2)ReschedulingRunnable的schedule方法如下,调用.executor的schedule方法,其中executor就是ScheduledThreadPoolExecutor

public ScheduledFuture<?> schedule() {
		synchronized (this.triggerContextMonitor) {
			this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
			if (this.scheduledExecutionTime == null) {
				return null;
			}
			long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
			//通过线程池执行调度
			this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
			return this;
		}
	}

(3)然后进入ScheduledThreadPoolExecutor的schedule方法

 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

(4)再进入delayedExecute(t) 方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
        	//加入队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

其中getQueue获取到调度线程池的延时队列DelayedWorkQueue,将任务加入队列,加入队列的逻辑:

 public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                	//扩容
                    grow();
                size = i + 1;
                //根据调度延时时间找到合适的位置
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                	//如果当前加入到队列第一个位置,将leader设置为null
                    leader = null;
                    //唤醒等待的线程(可能所有核心线程都在等待,需要唤醒一个去执行任务)
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

另外 ensurePrestart()是有任务时,保证有线程在执行

 void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        //小于核心线程,新增一个线程
        if (wc < corePoolSize)
            addWorker(null, true);
        //当前工作线程为0,新增一个线程
        else if (wc == 0)
            addWorker(null, false);
    }

(5)工作线程起来之后,从循环队列里获取任务,执行任务

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
       		 //获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //校验线程池状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	//执行前的回调
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	//执行后的回调
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

(6)获取任务task = getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
                    workQueue.take();//阻塞获取任务
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

workQueue 是ScheduledThreadPoolExecutor的内部类DelayedWorkQueue

其中的take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                    	//1、队列为空时,等待
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                        	//2、不需要等待了,返回任务
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                        	//3、不是leader线程,等待
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            	//4、延时执行,等待
                                available.awaitNanos(delay);
                            } finally {
                            	//5、将leader设置为null
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                	//6、唤醒其它线程
                    available.signal();
                lock.unlock();
            }
        }

该方法是获取任务的核心方法:

1、if (first == null),当队列为空时,所有线程进入等待;当有任务加入到队列里面时,会有线程从此处唤醒;

2、队列不为空时,获取延时执行时间,若延时时间小于等于0,表示不需要等待了(包括已经等待过了;或者任务执行较慢,目前时间已经超过了预定的执行时间),此时直接返回第一个任务;

3、若延时时间大于0,说明需要等待delay,如果此时leader不为null,说明已经有leader了,让leader线程执行第4处等待就行了,当前线程在第3处等待;

4、如果leader为null,则将当前线程赋给leader,让leader线程执行第4处等待;

5、leader线程等待时间到达后,可能有多个线程作为leader在等待,需要判断leader是当前线程时,重新将leader置为null;然后该线程继续循环获取任务,(当然取走任务的线程不一定是等待delay的leader线程,可能在leader等待时间到达后,其它线程占用锁,leader并没有被唤醒,此时任务的delay已经小于0,任务可能会被其它占有锁的线程取走)

6、取走任务的线程,会在第6处判断leader时null,并且队列里还有任务,会尝试唤醒一个第1处或第3处等待的线程继续循环取任务。

(7)、执行任务 task.run();

task是ScheduledFutureTask对象,它的run方法如下

 		public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
            		//schedule 任务
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
            	//scheduleWithFixedDelay、scheduleAtFixedRate 任务
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

(8)、 ScheduledFutureTask.super.run(); 既是FutureTask 的 run方法

 public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                	//任务被封装在Callable里面了
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

(9)、再继续进入到ReschedulingRunnable的run方法

public void run() {
		//任务真实执行时间
		Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
		//执行任务
		super.run();
		//任务完成时间
		Date completionTime = new Date(this.triggerContext.getClock().millis());
		synchronized (this.triggerContextMonitor) {
			Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
			this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
			if (!obtainCurrentFuture().isCancelled()) {
				//调度下次任务
				schedule();
			}
		}
	}

(10)、调度下次任务 schedule();

public ScheduledFuture<?> schedule() {
		synchronized (this.triggerContextMonitor) {
			//根据上次完成时间计算下次调度时间
			this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
			if (this.scheduledExecutionTime == null) {
				return null;
			}
			long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
			//将新任务加入线程池ScheduledThreadPoolExecutor
			this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
			return this;
		}
	}

从第9、10步可以看出,在调度过程中,维护了3个时间:

scheduledExecutionTime:任务预定的执行时间,根据上次完成时间计算,所以当任务执行周期大于调度周期时,后续任务都会延后;

actualExecutionTime:任务实际的执行时间,执行任务前记录;

completionTime:任务实际的完成时间,执行任务后记录。

(11)、下面接着分析第7步中的 scheduleWithFixedDelay、scheduleAtFixedRate 任务

首先创建任务:

 @Test
    public void testScheduleAtFixedRate(){
        Runnable runnable = () -> System.out.println("执行一次定时任务");
        //30秒执行一次
        ScheduledFuture<?> future = taskScheduler.scheduleAtFixedRate(runnable,30000 );
    }

(12)、ScheduledFutureTask.super.runAndReset() 既是既是FutureTask 的 runAndReset方法

执行任务

protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                	//执行任务
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

(13)、 设置下次执行时间 setNextRunTime();

 private void setNextRunTime() {
            long p = period;
            if (p > 0)
            	//scheduleWithFixedDelay 将上次时间加上周期
                time += p;
            else
            	//scheduleAtFixedRate 将当前时间加上周期
                time = triggerTime(-p);
        }

(14)、重复执行周期任务 reExecutePeriodic(outerTask);

将任务加入队列 DelayedWorkQueue

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
        	//加入队列
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

可以看出:

scheduleAtFixedRate,是以固定周期执行任务,也是在上次任务执行完时创建新的周期任务;当有时任务执行较慢时,线程池会延后执行后续任务。

scheduleWithFixedDelay,是以固定延后频率执行任务。





总结


1、schedule :当任务执行周期大于调度周期时,后续任务都会延后;

2、scheduleAtFixedRate : 当任务执行周期大于调度周期时,后续任务都会延后;当后面任务有变得执行较快时,线程池会将少执行的任务再补充回来,以达到相对固定的频率执行任务;

3、scheduleWithFixedDelay: 在上次任务完成后,以当前时间加上周期作为下次执行的时间,即以上次任务完成为基准,延后固定周期执行任务。



版权声明:本文为RenshenLi原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。