# 类结构
线程池也可以执行定时任务,以前如果要执行一个定时任务,需要用到 Timer
和 TimerTask
,但是它只是创建一个线程执行定时任务,无法多线程调度。
JDK1.5 之后可以使用 ScheduledThreadPoolExecutor
:
public class ScheduledThreadPoolExecutor | |
extends ThreadPoolExecutor | |
implements ScheduledExecutorService { | |
} |
该类所有构造方法都要求线程池最大容量为 Integer.MAX_VALUE
,并且均采用 DelayedWorkQueue
(内部类)作为等待队列。
之前的文章讲了 ThreadPoolExecutor
和 ScheduledExecutorService
,前者定义和实现了线程池方面的功能,后者定义了执行定时任务的方法。
简单使用一下,该方法可以提交一个延时任务:
public static void main(String[] args) { | |
// 直接设定核心线程数为 1 | |
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); | |
// 定义一个 3 秒后执行的任务 | |
executor.schedule(() -> System.out.println("才川晴香"), 3, TimeUnit.SECONDS); | |
// 定义一个 3 秒后开始,之后每个 1 秒执行一次的任务 | |
executor.scheduleAtFixedRate(() -> System.out.pritln("佐藤时雨"),3, 1, TimeUnit.SECONDS); | |
executor.shutdown(); | |
} |
同样,我们之前讲的 Executors
也可以直接创建一个这样的线程池:
// 如果要用 ScheduledThreadPoolExecutor 接收,外部需要强转一下 | |
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { | |
return new ScheduledThreadPoolExecutor(corePoolSize); | |
} |
# 源码解析
又是大家喜闻乐见的解析源码的环节
ScheduledThreadPoolExecutor
有两个内部类:
ScheduledFutureTask
:继承了FutureTask
,实现了RunnableScheduledFuture
接口,是一个可以延迟执行的异步运算任务。DelayedWorkQueue
:是ScheduledThreadPoolExecutor
专门为了存储周期或延迟任务定义的延迟队列,继承了AbstractQueue
,实现了BlockingQueue
。内部只允许存储RunnableScheduledFuture
对象。
# ScheduleFutureTask
相关类结构, ScheduledThreadPoolExecutor
的延迟队列存放的是 RunnableScheduledFuture
,也就是它实现的接口。
<img src="https://s3.uuu.ovh/imgs/2022/11/25/28ad3a759a16343f.jpg" style="zoom: 67%;" />
ScheduledThreadPoolExecutor
执行定时任务时,会将任务封装成 ScheduleFutureTask
对象。
# 属性
// 为相同延时任务提供的顺序编号 | |
// 定时任务中,一个任务是周期性执行的,但是它们的 sequenceNumber 的值相同,则被视为是同一个任务。 | |
private final long sequenceNumber; | |
// 任务可以执行的时间,纳秒级 | |
private long time; | |
// 重复任务的执行周期时间,纳秒级,0 表示不重复任务 | |
private final long period; | |
// 重新入队的任务 | |
RunnableScheduledFuture<V> outerTask = this; | |
// 延迟队列的索引,以支持更快的取消操作 | |
int heapIndex; |
# 核心方法
为了对延迟队列中的 ScheduleFutureTask
排序, ScheduleFutureTask
还实现了 Comparable
接口,排序规则为:
- 先比较延迟队列中每个任务下次执行时间,下次执行时间距离短的会排在前面。
- 再比较
sequenceNumber
值,小的会在前面。
构造方法:
ScheduledFutureTask(Runnable r, V result, long ns) { | |
super(r, result); // FutureTask 构造方法,为 Callable 和 state 赋值 | |
this.time = ns; | |
this.period = 0; | |
this.sequenceNumber = sequencer.getAndIncrement();// 递增 | |
} | |
ScheduledFutureTask(Runnable r, V result, long ns, long period) { | |
super(r, result); | |
this.time = ns; | |
this.period = period; | |
this.sequenceNumber = sequencer.getAndIncrement(); | |
} | |
ScheduledFutureTask(Callable<V> callable, long ns) { | |
super(callable); | |
this.time = ns; | |
this.period = 0; | |
this.sequenceNumber = sequencer.getAndIncrement(); | |
} |
getDelay()
方法:获得该任务下次执行时间对当前时间的间距,纳秒级。
// 获取下次执行任务的时间距离当前时间的纳秒数 | |
public long getDelay(TimeUnit unit) { | |
return unit.convert(time - now(), NANOSECONDS); | |
} |
run
方法:先拿到任务类别,判断是否可以执行,再分开执行。
注意,这个 run 是 ScheduledFutureTask 的 run 方法,外部 ScheduledThreadPoolExecutor 执行一个任务时,就是从外部调用这个 run 方法。
public void run() { | |
boolean periodic = isPeriodic();// 是否为周期任务 | |
if (!canRunInCurrentRunState(periodic))// 当前状态是否可以执行 | |
cancel(false); | |
else if (!periodic) | |
// 不是周期任务,直接执行 | |
ScheduledFutureTask.super.run(); | |
else if (ScheduledFutureTask.super.runAndReset()) { // 运行后重新设置状态 NEW | |
setNextRunTime();// 设置下一次运行时间 | |
// 这是外部类的方法 | |
reExecutePeriodic(outerTask);// 重排序一个周期任务,周期任务重新入队等待下一次执行 | |
} | |
} |
注意:
- 执行任务都是调用的父类
FututreTask
的run
和runAndReset
,后者会重置当前任务的执行状态。 reExecutePeriodic
方法是ScheduledThreadPoolExecutor
类的方法。会将延迟队列中任务重排序,上文说过,延迟队列只存放RunnableScheduledFuture<T>
,所以需要将outerTask
放进去,周期任务需要调用scheduleAtFixedRate
和scheduleWithFixedDelay
。会对该属性进行赋值操作。
void reExecutePeriodic(RunnableScheduledFuture<?> task) { | |
// 线程池当前状态下能够执行任务 | |
if (canRunInCurrentRunState(true)) { | |
// 将任务放入队列 | |
super.getQueue().add(task); | |
// 线程池当前状态下不能执行任务,并且成功移除任务 | |
if (!canRunInCurrentRunState(true) && remove(task)) | |
// 取消任务 | |
task.cancel(false); | |
else | |
// 调用 ThreadPoolExecutor 类的 ensurePrestart () 方法 | |
// 启动一个新的线程等待任务 | |
ensurePrestart(); | |
} | |
} |
# DelayedWorkQueue
线程池执行任务时需要从任务队列中拿任务,而普通的任务队列,如果里面有任务就直接拿出来了,但是延时队列不一样,它里面的任务,如果没有到时间也是拿不出来的。
该队列其实是一个优先队列,利用堆的特性获取最先运行的任务,即对顶的任务:
- 如果堆顶的任务到时间了,就出队。
- 如果堆顶的任务还没到时间,就看它还有多久到时间,利用条件锁等待这段时间,时间到了后再出队;
一个小细节,先看源码:
// 将任务放入队列 | |
super.getQueue().add(task); |
如果你仔细读了源码,肯定看到过这句代码,向上点进去, ThreadPoolExecutor
的 getQueue
源码为:
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
诶?我们不是应该将任务放到 DelayedWorkQueue
里面吗,为什么现在是放到父类里的 BlockingQueue
了?
其实看 ScheduleThreadPoolExecutor
的构造方法就知道:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { | |
// 已经为将 BlockingQueue 定义为 DelayedWorkQueue 了 | |
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, | |
new DelayedWorkQueue(), threadFactory); | |
} |
# 核心方法
现在讲的方法是
ScheduledThreadPoolExecutor
的方法。
schedule()
方法:主要执行一次性(延迟)任务,先将 Callable/Runnable
封装为 RunnableScheduledFuture
,再通过 delayedExecute
执行。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, | |
long delay, | |
TimeUnit unit) { | |
if (callable == null || unit == null) | |
throw new NullPointerException(); | |
// 构造 ScheduledFutureTask 任务 | |
RunnableScheduledFuture<V> t = decorateTask(callable, | |
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); | |
delayedExecute(t);// 任务执行主方法 | |
return t; | |
} | |
private void delayedExecute(RunnableScheduledFuture<?> task) { | |
if (isShutdown()) | |
reject(task);// 池已关闭,执行拒绝策略 | |
else { | |
super.getQueue().add(task);// 任务入队 | |
if (isShutdown() && | |
!canRunInCurrentRunState(task.isPeriodic()) && | |
remove(task))// 移除任务 | |
task.cancel(false); | |
else | |
ensurePrestart();// 启动一个新的线程等待任务 | |
} | |
} |
scheduleAtFixedRate
方法:之后每隔 period 执行一次,不等待第一次执行完成就开始计时
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, | |
long initialDelay, | |
long period, | |
TimeUnit unit) { | |
if (command == null || unit == null) | |
throw new NullPointerException(); | |
if (period <= 0) | |
throw new IllegalArgumentException(); | |
// 构建 RunnableScheduledFuture 任务类型 | |
ScheduledFutureTask<Void> sft = | |
new ScheduledFutureTask<Void>(command, | |
null, | |
triggerTime(initialDelay, unit),// 计算任务的延迟时间 | |
unit.toNanos(period));// 计算任务的执行周期 | |
RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 执行用户自定义逻辑 | |
sft.outerTask = t;// 赋值给 outerTask,准备重新入队等待下一次执行 | |
delayedExecute(t);// 执行任务 | |
return t; | |
} |
scheduleWithFixedDelay
方法:在第一次执行完之后延迟 delay 后开始下一次执行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, | |
long initialDelay, | |
long delay, | |
TimeUnit unit) { | |
if (command == null || unit == null) | |
throw new NullPointerException(); | |
if (delay <= 0) | |
throw new IllegalArgumentException(); | |
// 构建 RunnableScheduledFuture 任务类型 | |
ScheduledFutureTask<Void> sft = | |
new ScheduledFutureTask<Void>(command, | |
null, | |
triggerTime(initialDelay, unit),// 计算任务的延迟时间 | |
unit.toNanos(-delay));// 计算任务的执行周期 | |
// 这个其实返回的就是 sft,你不信自己看源码,command 都没有用到 | |
RunnableScheduledFuture<Void> t = decorateTask(command, sft); | |
sft.outerTask = t;// 赋值给 outerTask,准备重新入队等待下一次执行 | |
delayedExecute(t);// 执行任务 | |
return t; | |
} |
scheduleAtFixedRate
传的是正值,而 scheduleWithFixedDelay
传的则是负值,这个值就是 ScheduledFutureTask
的 period
属性。
# 总结
最后浅浅地总结一下整个流程,当我们传入一个周期任务时:
ScheduledThreadPoolExecutor
会将Callable/Runnable
封装成ScheduleFutureTask
,然后进一步拿到RunnableScheduledFuture
。- 延迟执行,如果不用拒绝的话,就加入延迟队列,启动一个新的线程执行延迟任务。
- 执行的时候,调用的
RunnableScheduledFuture
的 run 方法,本质上其实是调用FutureTask
的run
。
# 参考
https://www.yuque.com/qingkongxiaguang/javase/bnoif5#e6e1afbf
https://blog.csdn.net/l1028386804/article/details/104585295
https://www.cnblogs.com/tong-yuan/p/11801757.html
https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ScheduledThreadPoolExecutor.htm