# 类结构

线程池也可以执行定时任务,以前如果要执行一个定时任务,需要用到 TimerTimerTask ,但是它只是创建一个线程执行定时任务,无法多线程调度。

JDK1.5 之后可以使用 ScheduledThreadPoolExecutor

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
}

该类所有构造方法都要求线程池最大容量为 Integer.MAX_VALUE ,并且均采用 DelayedWorkQueue (内部类)作为等待队列。

之前的文章讲了 ThreadPoolExecutorScheduledExecutorService ,前者定义和实现了线程池方面的功能,后者定义了执行定时任务的方法。

简单使用一下,该方法可以提交一个延时任务:

public static void main(String[] args) {
  	// 直接设定核心线程数为 1
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    
    // 定义一个 3 秒后执行的任务
    executor.schedule(() -> System.out.println("才川晴香"), 3, TimeUnit.SECONDS);
    // 定义一个 3 秒后开始,之后每个 1 秒执行一次的任务
    executor.scheduleAtFixedRate(() -> System.out.pritln("佐藤时雨"),3, 1, TimeUnit.SECONDS);
    
    executor.shutdown();
}

同样,我们之前讲的 Executors 也可以直接创建一个这样的线程池:

// 如果要用 ScheduledThreadPoolExecutor 接收,外部需要强转一下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

# 源码解析

又是大家喜闻乐见的解析源码的环节

ScheduledThreadPoolExecutor 有两个内部类:

  • ScheduledFutureTask :继承了 FutureTask ,实现了 RunnableScheduledFuture 接口,是一个可以延迟执行的异步运算任务。
  • DelayedWorkQueue :是 ScheduledThreadPoolExecutor 专门为了存储周期或延迟任务定义的延迟队列,继承了 AbstractQueue ,实现了 BlockingQueue 。内部只允许存储 RunnableScheduledFuture 对象。

# ScheduleFutureTask

相关类结构, ScheduledThreadPoolExecutor 的延迟队列存放的是 RunnableScheduledFuture ,也就是它实现的接口。

<img src="https://s3.uuu.ovh/imgs/2022/11/25/28ad3a759a16343f.jpg" style="zoom: 67%;" />

ScheduledThreadPoolExecutor 执行定时任务时,会将任务封装成 ScheduleFutureTask 对象。

# 属性

// 为相同延时任务提供的顺序编号
// 定时任务中,一个任务是周期性执行的,但是它们的 sequenceNumber 的值相同,则被视为是同一个任务。
private final long sequenceNumber;
// 任务可以执行的时间,纳秒级
private long time;
// 重复任务的执行周期时间,纳秒级,0 表示不重复任务
private final long period;
// 重新入队的任务
RunnableScheduledFuture<V> outerTask = this;
// 延迟队列的索引,以支持更快的取消操作
int heapIndex;

# 核心方法

为了对延迟队列中的 ScheduleFutureTask 排序, ScheduleFutureTask 还实现了 Comparable 接口,排序规则为:

  1. 先比较延迟队列中每个任务下次执行时间,下次执行时间距离短的会排在前面。
  2. 再比较 sequenceNumber 值,小的会在前面。

构造方法:

ScheduledFutureTask(Runnable r, V result, long ns) {
	super(r, result); // FutureTask 构造方法,为 Callable 和 state 赋值
	this.time = ns;
	this.period = 0;
	this.sequenceNumber = sequencer.getAndIncrement();// 递增
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
	super(r, result);
	this.time = ns;
	this.period = period;
	this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
	super(callable);
	this.time = ns;
	this.period = 0;
	this.sequenceNumber = sequencer.getAndIncrement();
}

getDelay() 方法:获得该任务下次执行时间对当前时间的间距,纳秒级。

// 获取下次执行任务的时间距离当前时间的纳秒数
public long getDelay(TimeUnit unit) {
	return unit.convert(time - now(), NANOSECONDS);
}

run 方法:先拿到任务类别,判断是否可以执行,再分开执行。

注意,这个 run 是 ScheduledFutureTask 的 run 方法,外部 ScheduledThreadPoolExecutor 执行一个任务时,就是从外部调用这个 run 方法

public void run() {
    boolean periodic = isPeriodic();// 是否为周期任务
    if (!canRunInCurrentRunState(periodic))// 当前状态是否可以执行
        cancel(false);
    else if (!periodic)
        // 不是周期任务,直接执行
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 运行后重新设置状态 NEW
        setNextRunTime();// 设置下一次运行时间
        
        // 这是外部类的方法
        reExecutePeriodic(outerTask);// 重排序一个周期任务,周期任务重新入队等待下一次执行
    }
}

注意:

  1. 执行任务都是调用的父类 FututreTaskrunrunAndReset ,后者会重置当前任务的执行状态。
  2. reExecutePeriodic 方法是 ScheduledThreadPoolExecutor 类的方法。会将延迟队列中任务重排序,上文说过,延迟队列只存放 RunnableScheduledFuture<T> ,所以需要将 outerTask 放进去,周期任务需要调用 scheduleAtFixedRatescheduleWithFixedDelay 。会对该属性进行赋值操作。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
	// 线程池当前状态下能够执行任务
	if (canRunInCurrentRunState(true)) {
		// 将任务放入队列
		super.getQueue().add(task);
		// 线程池当前状态下不能执行任务,并且成功移除任务
		if (!canRunInCurrentRunState(true) && remove(task))
			// 取消任务
			task.cancel(false);
		else
			// 调用 ThreadPoolExecutor 类的 ensurePrestart () 方法
            // 启动一个新的线程等待任务
			ensurePrestart();
	}
}

# DelayedWorkQueue

线程池执行任务时需要从任务队列中拿任务,而普通的任务队列,如果里面有任务就直接拿出来了,但是延时队列不一样,它里面的任务,如果没有到时间也是拿不出来的。

该队列其实是一个优先队列,利用堆的特性获取最先运行的任务,即对顶的任务:

  1. 如果堆顶的任务到时间了,就出队。
  2. 如果堆顶的任务还没到时间,就看它还有多久到时间,利用条件锁等待这段时间,时间到了后再出队;

一个小细节,先看源码:

// 将任务放入队列
super.getQueue().add(task);

如果你仔细读了源码,肯定看到过这句代码,向上点进去, ThreadPoolExecutorgetQueue 源码为:

public BlockingQueue<Runnable> getQueue() {
	return workQueue;
}

诶?我们不是应该将任务放到 DelayedWorkQueue 里面吗,为什么现在是放到父类里的 BlockingQueue 了?

其实看 ScheduleThreadPoolExecutor 的构造方法就知道:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    // 已经为将 BlockingQueue 定义为 DelayedWorkQueue 了
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

# 核心方法

现在讲的方法是 ScheduledThreadPoolExecutor 的方法。

schedule() 方法:主要执行一次性(延迟)任务,先将 Callable/Runnable 封装为 RunnableScheduledFuture ,再通过 delayedExecute 执行。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    
    // 构造 ScheduledFutureTask 任务
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
    
    delayedExecute(t);// 任务执行主方法
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);// 池已关闭,执行拒绝策略
    else {
        super.getQueue().add(task);// 任务入队
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))// 移除任务
            task.cancel(false);
        else
            ensurePrestart();// 启动一个新的线程等待任务
    }
}

scheduleAtFixedRate 方法:之后每隔 period 执行一次,不等待第一次执行完成就开始计时

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 构建 RunnableScheduledFuture 任务类型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),// 计算任务的延迟时间
                                      unit.toNanos(period));// 计算任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 执行用户自定义逻辑
    sft.outerTask = t;// 赋值给 outerTask,准备重新入队等待下一次执行
    delayedExecute(t);// 执行任务
    return t;
}

scheduleWithFixedDelay 方法:在第一次执行完之后延迟 delay 后开始下一次执行

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 构建 RunnableScheduledFuture 任务类型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),// 计算任务的延迟时间
                                      unit.toNanos(-delay));// 计算任务的执行周期
    // 这个其实返回的就是 sft,你不信自己看源码,command 都没有用到
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;// 赋值给 outerTask,准备重新入队等待下一次执行
    delayedExecute(t);// 执行任务
    return t;
}

scheduleAtFixedRate 传的是正值,而 scheduleWithFixedDelay 传的则是负值,这个值就是 ScheduledFutureTaskperiod 属性。

# 总结

最后浅浅地总结一下整个流程,当我们传入一个周期任务时:

  1. ScheduledThreadPoolExecutor 会将 Callable/Runnable 封装成 ScheduleFutureTask ,然后进一步拿到 RunnableScheduledFuture
  2. 延迟执行,如果不用拒绝的话,就加入延迟队列,启动一个新的线程执行延迟任务。
  3. 执行的时候,调用的 RunnableScheduledFuture 的 run 方法,本质上其实是调用 FutureTaskrun

# 参考

https://www.yuque.com/qingkongxiaguang/javase/bnoif5#e6e1afbf

https://blog.csdn.net/l1028386804/article/details/104585295

https://www.cnblogs.com/tong-yuan/p/11801757.html

https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ScheduledThreadPoolExecutor.htm