FutureTask
为 Future
提供了基础实现(言外之意就是也提供了一些功能性函数供我们创建自定义 task
类使用),如获取任务执行结果 ( get
) 和取消任务 ( cancel
) 等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消 (除非使用 runAndReset
执行计算)。 FutureTask
常用来封装 Callable
和 Runnable
,也可以作为一个任务提交到线程池中执行。 FutureTask
的线程安全由 CAS
来保证。
# 异步模型
在 Java
的并发编程中,大体上会分为两种异步编程模型,一类是以异步的形式来并行运行其他的任务,不需要返回任务的结果数据。一类是以异步的形式运行其他任务,需要返回结果。
# 无返回结果异步模型
无返回结果的异步任务可以直接丢进线程或线程池中运行。想要获取运行结果的话,可以调用回调方法。具体实现是定义一个回调接口,并在接口中定义接收任务结果数据的方法。任务运行后调用接口方法,执行回调接口实现类中的逻辑处理结果数据
- 定义回调接口
// 定义任务结果数据的封装类 | |
public interface TaskCallable<T> { | |
T callable(T t); | |
} |
- 定义任务结果数据的封装类
public class TaskResult implements Serializable { | |
// 任务状态 | |
private Integer taskStatus; | |
// 任务消息 | |
private String taskMessage; | |
// 任务结果数据 | |
private String taskResult; | |
@Override | |
public String toString() { | |
return "TaskResult{" + | |
"taskStatus=" + taskStatus + | |
", taskMessage='" + taskMessage + '\'' + | |
", taskResult='" + taskResult + '\'' + | |
'}'; | |
} | |
} |
- 创建回调接口的实现类
public class TaskHandler implements TaskCallable<TaskResult> { | |
@Override | |
public TaskResult callable(TaskResult taskResult) { | |
//TODO 拿到结果数据后进一步处理 | |
System.out.println(taskResult.toString()); | |
return taskResult; | |
} | |
} |
- 创建任务的执行类
public class TaskExecutor implements Runnable{ | |
private TaskCallable<TaskResult> taskCallable; | |
private String taskParameter; | |
public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){ | |
this.taskCallable = taskCallable; | |
this.taskParameter = taskParameter; | |
} | |
@Override | |
public void run() { | |
//TODO 一系列业务逻辑,将结果数据封装成 TaskResult 对象并返回 | |
TaskResult result = new TaskResult(); | |
result.setTaskStatus(1); | |
result.setTaskMessage(this.taskParameter); | |
result.setTaskResult("异步回调成功"); | |
taskCallable.callable(result); | |
} | |
} |
- 运行一下
public class TaskCallableTest { | |
public static void main(String[] args){ | |
TaskCallable<TaskResult> taskCallable = new TaskHandler(); | |
TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "测试回调任务"); | |
new Thread(taskExecutor).start(); | |
} | |
} | |
// 结果:TaskResult {taskStatus=1, taskMessage=' 测试回调任务 ', taskResult=' 异步回调成功 '} |
这种实现,主要就是可以传入不同的 TaskCallable
,如果回调是函数式接口,甚至不需要实现类,直接在 TaskExecutor
传入 Lambda
表达式即可。
# 有返回结果模型
我们自己定义接口并实现回调方法比较麻烦,JDK 提供了可以直接返回异步结果的处理方案。
- 使用
Future
接口获取异步结果,常与线程池一起使用。
public void f() { | |
ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
//submit 提交任务 | |
Future<String> future = executorService.submit(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
Thread.sleep(1000); | |
return "测试Future获取异步结果"; | |
} | |
}); | |
System.out.println("获取结果ing..."); | |
System.out.println(future.get()); | |
System.out.println("获取结果end..."); | |
executorService.shutdown();// 线程池关闭 | |
} |
从执行结果看, future.get()
会阻塞当前线程,也就是只有拿到了结果,才会打印 end...
。
- 使用
FutureTask
获取异步结果,FutureTask
类还实现了Runnable
,所以既可以结合Thread
类使用也可以结合线程池使用。
// 配合 Thread 使用 | |
public void f() { | |
//submit 提交任务 | |
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
Thread.sleep(1000); | |
return "测试FutureTask获取异步结果"; | |
} | |
}); | |
new Thread(futureTask).start(); | |
System.out.println("获取结果ing..."); | |
System.out.println(futureTask.get()); | |
System.out.println("获取结果end..."); | |
} | |
// 配合线程池使用,和 Future 没什么区别 | |
public void p() { | |
ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
//submit 提交任务 | |
FutureTask<String> futureTask = new FutureTask<>(() -> { | |
Thread.sleep(1000); | |
return "测试FutureTask获取异步结果"; | |
}); | |
executorService.execute(futureTask); | |
System.out.println("获取结果ing..."); | |
System.out.println(futureTask.get()); | |
System.out.println("获取结果end..."); | |
executorService.shutdown();// 线程池关闭 | |
} |
在调用 get
的时候都是阻塞调用。
# 相关类解析
分析一下上面代码使用的类以及类结构。
# Callable
泛型接口,对比 Runnable
不会返回数据,也不会抛出异常
// 函数式接口 | |
@FunctionalInterface | |
public interface Callable<V> { | |
V call() throws Exception; | |
} |
复习一下,在 JDK 1.8
中只声明有一个方法的接口为函数式接口,函数式接口可以使用 @FunctionalInterface
注解修饰,也可以不使用 @FunctionalInterface
注解修饰。只要一个接口中只有一个方法(可以有默认方法),那么,这个接口就是函数式接口。
# Future
泛型接口,代表异步计算的结果,在两种异步模型中可以看出 Future
的使用
public interface Future<V> { | |
// 取消异步任务的执行 | |
boolean cancel(boolean mayInterruptIfRunning); | |
// 判断任务是否被取消 | |
boolean isCancelled(); | |
// 判断任务是否已经完成,如果任务被取消或者抛出异常,也是 true | |
boolean isDone(); | |
V get() throws InterruptedException, ExecutionException; | |
// 带超时时间的 get () 版本,如果阻塞等待过程中超时则会抛出 TimeoutException 异常 | |
V get(long timeout, TimeUnit unit) | |
throws InterruptedException, ExecutionException, TimeoutException; | |
} |
cancel
函数:如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false
。如果任务还没有被执行,则会返回 true 并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning
(当前传递的boolean
类型参数)为true
,则会立即中断执行任务的线程并返回true
,若mayInterruptIfRunning
为false
,则会返回true
且不会中断任务执行线程。
# RunnableFuture
RunnableFuture
继承了 Runnable
接口和 Future
接口,而 FutureTask
实现了 RunnableFuture
接口。所以它既可以作为 Runnable
被线程执行,又可以作为 Future
得到 Callable
的返回值。 RunnableFuture
接口想要其实现类实现的功能就是相当于一个拥有 run
方法的 future
接口。
public interface RunnableFuture<V> extends Runnable, Future<V> { | |
void run(); | |
} |
# FutureTask
# 关键属性
// 内部持有的 callable 任务,运行完毕后置空 | |
private Callable<V> callable; | |
// 从 get () 中返回的结果或抛出的异常 | |
private Object outcome; // non-volatile, protected by state reads/writes | |
// 运行 callable 的线程 | |
private volatile Thread runner; | |
// 使用 Treiber 栈保存等待线程 | |
private volatile WaitNode waiters; |
FutureTask
还有几个属性表示状态
// 任务状态 | |
private volatile int state; | |
private static final int NEW = 0; | |
private static final int COMPLETING = 1; | |
private static final int NORMAL = 2; | |
private static final int EXCEPTIONAL = 3; | |
private static final int CANCELLED = 4; | |
private static final int INTERRUPTING = 5; | |
private static final int INTERRUPTED = 6; |
FutureTask
的状态是非常重要的,这里详细讲一下:
state
是volatile
修饰,确保了其他线程对其修改的可见性。如果任务状态不是
NEW
,就说明任务已经完成了。任务如果处于取消或者中断,也被认定为完成了。COMPLETING
和INTERRUPTING
是中间态,只会存在短暂时间。
任务的初始状态都是 NEW
,由构造函数保证。
任务的终止状态有 4 种:
NORMAL
:任务正常执行完毕EXCEPTIONAL
:任务执行过程种发生异常CANCELLED
:任务被取消INTERRUPTED
:任务被中断
任务的中间状态有 2 种:
COMPLETING
:正在设置任务结果INTERRUPTING
:正在中断运行任务的线程
尽管状态比较多,但是转换路径是固定的。任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果。
# 内部队列
FutureTask
中有一个属性是 private volatile WaitNode waiters
。这个类是 FutureTask
的静态内部类,先来看一下这个类的源码
static final class WaitNode { | |
volatile Thread thread; | |
volatile WaitNode next; | |
WaitNode() { thread = Thread.currentThread(); } | |
} |
队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合。这个队列存放的不是执行任务的线程,而是获取结果的线程。我们假设执行任务的线程是线程 t
,获取任务结果的线程 s
。如果 s
在任务完成前就调用 get
方法获取结果,就会在一个等待队列中挂起,直到任务执行完毕被唤醒。
值得一提的是, FutureTask
中的这个单向链表是当做栈来使用的,确切来说是当做 Treiber
栈来使用的,不了解 Treiber
栈是个啥的可以简单的把它当做是一个线程安全的栈,它使用 CAS
来完成入栈出栈操作 (想进一步了解的话可以看这篇文章)。同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成 WaitNode
扔到 Treiber
栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用 CAS 操作保证入栈的线程安全,对于出栈的情况也是同理。
# 构造函数
public FutureTask(Callable<V> callable) { | |
if (callable == null) | |
throw new NullPointerException(); | |
this.callable = callable; | |
this.state = NEW; // ensure visibility of callable | |
} | |
public FutureTask(Runnable runnable, V result) { | |
this.callable = Executors.callable(runnable, result); | |
this.state = NEW; // ensure visibility of callable | |
} |
最终所做的事情其实就是两个:初始化 callable
和状态 state
,如果不需要返回值 result
可以传 null
。我们顺便看一下 Executors.callable()
方法,
// 把 Runnable 转换成 Callable | |
public static <T> Callable<T> callable(Runnable task, T result) { | |
if (task == null) | |
throw new NullPointerException(); | |
return new RunnableAdapter<T>(task, result); | |
} |
这里采用的是适配器模式,调用 RunnableAdapter<T>(task, result)
方法来适配,关于适配器模式,参考文章。
static final class RunnableAdapter<T> implements Callable<T> { | |
final Runnable task; | |
final T result; | |
RunnableAdapter(Runnable task, T result) { | |
this.task = task; | |
this.result = result; | |
} | |
public T call() { | |
task.run(); | |
return result; | |
} | |
} |
# 核心方法 - run ()
在 new
了一个 FutureTask
对象之后,接下来就是在另一个线程中执行这个 Task
, 无论是通过直接 new
一个 Thread
还是通过线程池,执行的都是 run()
方法,接下来就看看 run()
方法的实现。
public void run() { | |
// 如果不是新建任务或者 CAS 替换失败,函数执行结束 | |
// 此处 CAS 是将 runnner 设置为当前线程,在此之前 runner 都是 null | |
if (state != NEW || | |
!UNSAFE.compareAndSwapObject(this, runnerOffset, | |
null, Thread.currentThread())) | |
return; | |
try { | |
Callable<V> c = callable; | |
if (c != null && state == NEW) { | |
V result; | |
boolean ran; | |
try { | |
result = c.call(); // 执行任务 | |
ran = true; | |
} catch (Throwable ex) { | |
result = null; | |
ran = false; | |
setException(ex); | |
} | |
if (ran) | |
set(result);// 设置执行结果 | |
} | |
} finally { | |
runner = null; | |
int s = state; | |
// 在执行过程中被中断了,需要处理中断逻辑 | |
if (s >= INTERRUPTING) | |
handlePossibleCancellationInterrupt(s);// 处理中断逻辑 | |
} | |
} |
在 run
方法出现的其他方法也需要说一下,当 call()
函数执行成功,会通过 set
函数设置结果
protected void set(V v) { | |
// CAS 更改状态 NEW --> COMPLETING | |
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { | |
// 设置 FutureTask 属性 outcome 为 v,其实就是设置返回结果 | |
outcome = v; | |
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state | |
finishCompletion();// 执行完毕,唤醒等待线程 | |
} | |
} |
唤醒等待线程的 finishCompletion
源码
private void finishCompletion() { | |
// assert state > COMPLETING; | |
for (WaitNode q; (q = waiters) != null;) { | |
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 移除等待线程 | |
for (;;) {// 自旋遍历等待线程 | |
Thread t = q.thread; | |
if (t != null) { | |
q.thread = null; | |
LockSupport.unpark(t);// 唤醒等待线程 | |
} | |
WaitNode next = q.next; | |
if (next == null) | |
break; | |
q.next = null; // unlink to help gc | |
q = next; | |
} | |
break; | |
} | |
} | |
// 任务完成后调用函数,自定义扩展 | |
done(); | |
callable = null; // to reduce footprint | |
} |
如果在 run
期间发生中断,就会进入中断的处理逻辑
private void handlePossibleCancellationInterrupt(int s) { | |
// 在中断者中断线程之前可能会延迟,所以我们只需要让出 CPU 时间片自旋等待 | |
if (s == INTERRUPTING) | |
while (state == INTERRUPTING) | |
Thread.yield(); // wait out pending interrupt | |
} |
# 核心方法 - get ()
// 获取执行结果 | |
public V get() throws InterruptedException, ExecutionException { | |
int s = state; | |
if (s <= COMPLETING) // 处于未完成状态 | |
s = awaitDone(false, 0L); // 该方法之后会讲到,0L 表示无限等待 | |
return report(s);// 获取执行结果或抛出执行期间的异常 | |
} | |
// 返回执行结果或抛出异常 | |
private V report(int s) throws ExecutionException { | |
Object x = outcome; // 获得结果 | |
if (s == NORMAL) | |
return (V)x; | |
if (s >= CANCELLED) | |
throw new CancellationException(); | |
throw new ExecutionException((Throwable)x); | |
} |
# 核心方法 - cancel ()
public boolean cancel(boolean mayInterruptIfRunning) { | |
// 如果当前 Future 状态为 NEW,根据参数修改 Future 状态为 INTERRUPTING 或 CANCELLED | |
if (!(state == NEW && | |
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, | |
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) | |
return false; | |
try { | |
if (mayInterruptIfRunning) {// 可以在运行时中断 | |
try { | |
Thread t = runner; | |
if (t != null) | |
t.interrupt(); | |
} finally { // final state | |
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); | |
} | |
} | |
} finally { | |
finishCompletion();// 移除并唤醒所有等待线程 | |
} | |
return true; | |
} |
# 核心方法 - awaitDone ()
在具体分析它的源码之前,有一点我们先特别说明一下, FutureTask
中会涉及到两类线程,一类是执行任务的线程,它只有一个, FutureTask
的 run
方法就由该线程来执行;一类是获取任务执行结果的线程,它可以有多个,这些线程可以并发执行,每一个线程都是独立的,都可以调用 get
方法来获取任务的执行结果。如果任务还没有执行完,则这些线程就需要进入 Treiber
栈中挂起,直到任务执行结束,或者等待的线程自身被中断。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { | |
final long deadline = timed ? System.nanoTime() + nanos : 0L; | |
WaitNode q = null; | |
boolean queued = false; | |
for (;;) {// 自旋 | |
if (Thread.interrupted()) {// 获取并清除中断状态 | |
removeWaiter(q);// 移除等待 WaitNode | |
throw new InterruptedException(); | |
} | |
int s = state; | |
if (s > COMPLETING) { | |
if (q != null) | |
q.thread = null;// 置空等待节点的线程 | |
return s; | |
} | |
else if (s == COMPLETING) // cannot time out yet | |
Thread.yield(); | |
else if (q == null) | |
q = new WaitNode(); | |
else if (!queued) | |
//CAS 修改 waiter | |
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, | |
q.next = waiters, q); | |
else if (timed) { | |
nanos = deadline - System.nanoTime(); | |
if (nanos <= 0L) { | |
removeWaiter(q);// 超时,移除等待节点 | |
return state; | |
} | |
LockSupport.parkNanos(this, nanos);// 阻塞当前线程 | |
} | |
else | |
LockSupport.park(this);// 阻塞当前线程 | |
} | |
} |
该方法的大框架就是自旋,我们先检测当前线程是否被中断了,这是因为 get
方法是阻塞式的,如果等待的任务还没有执行完,则调用 get
方法的线程会被扔到 Treiber
栈中挂起等待,直到任务执行完毕。但是,如果任务迟迟没有执行完毕,则我们也有可能直接中断在 Treiber
栈中的线程,以停止等待。
removeWaiter
的作用是将参数中的 node
从等待队列(即 Treiber
栈)中移除。如果此时线程还没有进入 Treiber
栈,则 q=null
,那么 removeWaiter(q)
啥也不干。在这之后,我们就直接抛出了 InterruptedException
异常。
之后
- 如果任务已经进入终止态(
s > COMPLETING
),我们就直接返回任务的状态; - 否则,如果任务正在设置执行结果(
s == COMPLETING
),我们就让出当前线程的 CPU 资源继续等待 - 否则,就说明任务还没有执行,或者任务正在执行过程中,那么这时,如果 q 现在还为 null, 说明当前线程还没有进入等待队列,于是我们新建了一个
WaitNode
,WaitNode
的构造函数我们之前已经看过了,就是生成了一个记录了当前线程的节点; - 如果
q
不为null
,说明代表当前线程的WaitNode
已经被创建出来了,则接下来如果queued=false
,表示当前线程还没有入队,所以我们执行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); |
它的等价代码就是
q.next = waiters; // 当前节点的 next 指向目前的栈顶元素 | |
// 如果栈顶节点在这个过程中没有变,即没有发生并发入栈的情况 | |
if(waiters的值还是上面q.next所使用的waiters值){ | |
waiters = q; // 修改栈顶的指针,指向刚刚入栈的节点 | |
} |
最后就是一些超时结束等待和自行阻塞的函数。
# 参考
https://pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html
https://www.cnblogs.com/linghu-java/p/8991824.html
https://www.jianshu.com/p/d61d7ffa6abc
https://blog.csdn.net/xingzhong128/article/details/80553789
冰河《深入理解高并发编程》(第 1 版)
讲得很详细,强烈推荐:https://segmentfault.com/a/1190000016572591#item-4