FutureTaskFuture 提供了基础实现(言外之意就是也提供了一些功能性函数供我们创建自定义 task 类使用),如获取任务执行结果 ( get ) 和取消任务 ( cancel ) 等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消 (除非使用 runAndReset 执行计算)。 FutureTask 常用来封装 CallableRunnable ,也可以作为一个任务提交到线程池中执行。 FutureTask 的线程安全由 CAS 来保证。

# 异步模型

Java 的并发编程中,大体上会分为两种异步编程模型,一类是以异步的形式来并行运行其他的任务,不需要返回任务的结果数据。一类是以异步的形式运行其他任务,需要返回结果。

# 无返回结果异步模型

无返回结果的异步任务可以直接丢进线程或线程池中运行。想要获取运行结果的话,可以调用回调方法。具体实现是定义一个回调接口,并在接口中定义接收任务结果数据的方法。任务运行后调用接口方法,执行回调接口实现类中的逻辑处理结果数据

  • 定义回调接口
// 定义任务结果数据的封装类
public interface TaskCallable<T> {
	T callable(T t);
}
  • 定义任务结果数据的封装类
public class TaskResult implements Serializable {
    // 任务状态
	private Integer taskStatus;
    
    // 任务消息
    private String taskMessage;
    // 任务结果数据
	private String taskResult;
    @Override
    public String toString() {
        return "TaskResult{" +
                "taskStatus=" + taskStatus +
                ", taskMessage='" + taskMessage + '\'' +
                ", taskResult='" + taskResult + '\'' +
                '}';
    }
}
  • 创建回调接口的实现类
public class TaskHandler implements TaskCallable<TaskResult> {
    @Override
    public TaskResult callable(TaskResult taskResult) {
    	//TODO 拿到结果数据后进一步处理
    	System.out.println(taskResult.toString());
    	return taskResult;
    }
}
  • 创建任务的执行类
public class TaskExecutor implements Runnable{
    private TaskCallable<TaskResult> taskCallable;
    private String taskParameter;
    
    public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){
    	this.taskCallable = taskCallable;
    	this.taskParameter = taskParameter;
    }
    
    @Override
    public void run() {
        //TODO 一系列业务逻辑,将结果数据封装成 TaskResult 对象并返回
        TaskResult result = new TaskResult();
        result.setTaskStatus(1);
        result.setTaskMessage(this.taskParameter);
        result.setTaskResult("异步回调成功");
        taskCallable.callable(result);
    }
}
  • 运行一下
public class TaskCallableTest {
    public static void main(String[] args){
        TaskCallable<TaskResult> taskCallable = new TaskHandler();
        TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "测试回调任务");
        new Thread(taskExecutor).start();
    }
}
// 结果:TaskResult {taskStatus=1, taskMessage=' 测试回调任务 ', taskResult=' 异步回调成功 '}

这种实现,主要就是可以传入不同的 TaskCallable ,如果回调是函数式接口,甚至不需要实现类,直接在 TaskExecutor 传入 Lambda 表达式即可。

# 有返回结果模型

我们自己定义接口并实现回调方法比较麻烦,JDK 提供了可以直接返回异步结果的处理方案。

  • 使用 Future 接口获取异步结果,常与线程池一起使用。
public void f() {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    //submit 提交任务
    Future<String> future = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "测试Future获取异步结果";
        }
    });
    System.out.println("获取结果ing...");
    System.out.println(future.get());
    System.out.println("获取结果end...");
    executorService.shutdown();// 线程池关闭
}

从执行结果看, future.get() 会阻塞当前线程,也就是只有拿到了结果,才会打印 end...

  • 使用 FutureTask 获取异步结果, FutureTask 类还实现了 Runnable ,所以既可以结合 Thread 类使用也可以结合线程池使用。
// 配合 Thread 使用
public void f() {
    //submit 提交任务
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "测试FutureTask获取异步结果";
        }
    });
    new Thread(futureTask).start();
    System.out.println("获取结果ing...");
    System.out.println(futureTask.get());
    System.out.println("获取结果end...");
}
// 配合线程池使用,和 Future 没什么区别
public void p() {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    //submit 提交任务
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        Thread.sleep(1000);
        return "测试FutureTask获取异步结果";
    });
    executorService.execute(futureTask);
    System.out.println("获取结果ing...");
    System.out.println(futureTask.get());
    System.out.println("获取结果end...");
    executorService.shutdown();// 线程池关闭
}

在调用 get 的时候都是阻塞调用。

# 相关类解析

分析一下上面代码使用的类以及类结构。

# Callable

泛型接口,对比 Runnable 不会返回数据,也不会抛出异常

// 函数式接口
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

复习一下,在 JDK 1.8 中只声明有一个方法的接口为函数式接口,函数式接口可以使用 @FunctionalInterface 注解修饰,也可以不使用 @FunctionalInterface 注解修饰。只要一个接口中只有一个方法(可以有默认方法),那么,这个接口就是函数式接口。

# Future

泛型接口,代表异步计算的结果,在两种异步模型中可以看出 Future 的使用

public interface Future<V> {
    // 取消异步任务的执行
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 判断任务是否被取消
    boolean isCancelled();
    
    // 判断任务是否已经完成,如果任务被取消或者抛出异常,也是 true
    boolean isDone();
    
    V get() throws InterruptedException, ExecutionException;
    
    // 带超时时间的 get () 版本,如果阻塞等待过程中超时则会抛出 TimeoutException 异常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel 函数:如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回 false 。如果任务还没有被执行,则会返回 true 并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若 mayInterruptIfRunning (当前传递的 boolean 类型参数)为 true ,则会立即中断执行任务的线程并返回 true ,若 mayInterruptIfRunningfalse ,则会返回 true 且不会中断任务执行线程。

# RunnableFuture

RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 实现了 RunnableFuture 接口。所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。 RunnableFuture 接口想要其实现类实现的功能就是相当于一个拥有 run 方法的 future 接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

# FutureTask

# 关键属性

// 内部持有的 callable 任务,运行完毕后置空
private Callable<V> callable;
// 从 get () 中返回的结果或抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
// 运行 callable 的线程
private volatile Thread runner;
// 使用 Treiber 栈保存等待线程
private volatile WaitNode waiters;

FutureTask 还有几个属性表示状态

// 任务状态
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

FutureTask 的状态是非常重要的,这里详细讲一下:

  • statevolatile 修饰,确保了其他线程对其修改的可见性。

  • 如果任务状态不是 NEW ,就说明任务已经完成了。任务如果处于取消或者中断,也被认定为完成了。

  • COMPLETINGINTERRUPTING 是中间态,只会存在短暂时间

任务的初始状态都是 NEW ,由构造函数保证。

任务的终止状态有 4 种:

  • NORMAL :任务正常执行完毕
  • EXCEPTIONAL :任务执行过程种发生异常
  • CANCELLED :任务被取消
  • INTERRUPTED :任务被中断

任务的中间状态有 2 种:

  • COMPLETING :正在设置任务结果
  • INTERRUPTING :正在中断运行任务的线程

尽管状态比较多,但是转换路径是固定的。任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果

状态转换

# 内部队列

FutureTask 中有一个属性是 private volatile WaitNode waiters 。这个类是 FutureTask 的静态内部类,先来看一下这个类的源码

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合。这个队列存放的不是执行任务的线程,而是获取结果的线程。我们假设执行任务的线程是线程 t ,获取任务结果的线程 s 。如果 s 在任务完成前就调用 get 方法获取结果,就会在一个等待队列中挂起,直到任务执行完毕被唤醒。

值得一提的是, FutureTask 中的这个单向链表是当做来使用的,确切来说是当做 Treiber 栈来使用的,不了解 Treiber 栈是个啥的可以简单的把它当做是一个线程安全的栈,它使用 CAS 来完成入栈出栈操作 (想进一步了解的话可以看这篇文章)。同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成 WaitNode 扔到 Treiber 栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用 CAS 操作保证入栈的线程安全,对于出栈的情况也是同理。

队列

# 构造函数

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

最终所做的事情其实就是两个:初始化 callable 和状态 state ,如果不需要返回值 result 可以传 null 。我们顺便看一下 Executors.callable() 方法,

// 把 Runnable 转换成 Callable
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
       throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

这里采用的是适配器模式,调用 RunnableAdapter<T>(task, result) 方法来适配,关于适配器模式,参考文章

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

# 核心方法 - run ()

new 了一个 FutureTask 对象之后,接下来就是在另一个线程中执行这个 Task , 无论是通过直接 new 一个 Thread 还是通过线程池,执行的都是 run() 方法,接下来就看看 run() 方法的实现。

public void run() {
    // 如果不是新建任务或者 CAS 替换失败,函数执行结束
    // 此处 CAS 是将 runnner 设置为当前线程,在此之前 runner 都是 null
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 执行任务
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);// 设置执行结果
        }
    } finally {
        runner = null;
        int s = state;
        // 在执行过程中被中断了,需要处理中断逻辑
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);// 处理中断逻辑
    }
}

run 方法出现的其他方法也需要说一下,当 call() 函数执行成功,会通过 set 函数设置结果

protected void set(V v) {
    // CAS 更改状态 NEW --> COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 设置 FutureTask 属性 outcome 为 v,其实就是设置返回结果
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();// 执行完毕,唤醒等待线程
    }
}

唤醒等待线程的 finishCompletion 源码

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 移除等待线程
            for (;;) {// 自旋遍历等待线程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);// 唤醒等待线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 任务完成后调用函数,自定义扩展
    done();
    callable = null;        // to reduce footprint
}

如果在 run 期间发生中断,就会进入中断的处理逻辑

private void handlePossibleCancellationInterrupt(int s) {
    // 在中断者中断线程之前可能会延迟,所以我们只需要让出 CPU 时间片自旋等待
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

# 核心方法 - get ()

// 获取执行结果
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)	// 处于未完成状态
        s = awaitDone(false, 0L);	// 该方法之后会讲到,0L 表示无限等待
    return report(s);// 获取执行结果或抛出执行期间的异常
}
// 返回执行结果或抛出异常
private V report(int s) throws ExecutionException {
    Object x = outcome;	// 	获得结果
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

# 核心方法 - cancel ()

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果当前 Future 状态为 NEW,根据参数修改 Future 状态为 INTERRUPTING 或 CANCELLED
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    
        if (mayInterruptIfRunning) {// 可以在运行时中断
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();// 移除并唤醒所有等待线程
    }
    return true;
}

# 核心方法 - awaitDone ()

在具体分析它的源码之前,有一点我们先特别说明一下, FutureTask 中会涉及到两类线程,一类是执行任务的线程,它只有一个, FutureTaskrun 方法就由该线程来执行;一类是获取任务执行结果的线程,它可以有多个,这些线程可以并发执行,每一个线程都是独立的,都可以调用 get 方法来获取任务的执行结果。如果任务还没有执行完,则这些线程就需要进入 Treiber 栈中挂起,直到任务执行结束,或者等待的线程自身被中断。

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {// 自旋
        if (Thread.interrupted()) {// 获取并清除中断状态
            removeWaiter(q);// 移除等待 WaitNode
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;// 置空等待节点的线程
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            //CAS 修改 waiter
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);// 超时,移除等待节点
                return state;
            }
            LockSupport.parkNanos(this, nanos);// 阻塞当前线程
        }
        else
            LockSupport.park(this);// 阻塞当前线程
    }
}

该方法的大框架就是自旋,我们先检测当前线程是否被中断了,这是因为 get 方法是阻塞式的,如果等待的任务还没有执行完,则调用 get 方法的线程会被扔到 Treiber 栈中挂起等待,直到任务执行完毕。但是,如果任务迟迟没有执行完毕,则我们也有可能直接中断在 Treiber 栈中的线程,以停止等待。

removeWaiter 的作用是将参数中的 node 从等待队列(即 Treiber 栈)中移除。如果此时线程还没有进入 Treiber 栈,则 q=null ,那么 removeWaiter(q) 啥也不干。在这之后,我们就直接抛出了 InterruptedException 异常。

之后

  • 如果任务已经进入终止态( s > COMPLETING ),我们就直接返回任务的状态;
  • 否则,如果任务正在设置执行结果( s == COMPLETING ),我们就让出当前线程的 CPU 资源继续等待
  • 否则,就说明任务还没有执行,或者任务正在执行过程中,那么这时,如果 q 现在还为 null, 说明当前线程还没有进入等待队列,于是我们新建了一个 WaitNode , WaitNode 的构造函数我们之前已经看过了,就是生成了一个记录了当前线程的节点;
  • 如果 q 不为 null ,说明代表当前线程的 WaitNode 已经被创建出来了,则接下来如果 queued=false ,表示当前线程还没有入队,所以我们执行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

它的等价代码就是

q.next = waiters; // 当前节点的 next 指向目前的栈顶元素
// 如果栈顶节点在这个过程中没有变,即没有发生并发入栈的情况
if(waiters的值还是上面q.next所使用的waiters值){ 
    waiters = q; // 修改栈顶的指针,指向刚刚入栈的节点
}

最后就是一些超时结束等待和自行阻塞的函数。

# 参考

https://pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html

https://www.cnblogs.com/linghu-java/p/8991824.html

https://www.jianshu.com/p/d61d7ffa6abc

https://blog.csdn.net/xingzhong128/article/details/80553789

冰河《深入理解高并发编程》(第 1 版)

讲得很详细,强烈推荐:https://segmentfault.com/a/1190000016572591#item-4