# 基本使用

locks 包下有一个类是 AbstractQueuedSynchronizer ,其简写就是 AQSAQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器。

AQS 是一个提供给用户自定义同步器的简单框架。其内部严格使用先进先出的阻塞队列,并且构造出来的同步器依赖于一个 int 类型的 state 来判断是否有线程占用锁。 AQS 提供独占模式( exclusive )和共享模式( shared ),用户自定义的同步器一般支持一种模式,当然有些同步器两种也支持。

从本质上说, AQS 实现同步的底层逻辑原理和 synchronizd 是一样的,线程拿不到锁就会进入阻塞队列,需要等待某个条件时就会进入等待队列直到被唤醒。拿到锁的线程此时是线程安全的,直到其主动释放锁。当线程释放锁后,会将同步队列中的队首节点唤醒,使其重新竞争锁。当线程从等待队列中被唤醒时,会加入到阻塞队列中,等待重新竞争锁。

AQS 通过让用户重写指定的方法来实现自定义的同步器:

// 前两个是独占模式需要重写的方法    
	protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
	
	// 后两个是共享模式需要重写的方法
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
	protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
	
	// 判断是否同步只与当前线程保持一致
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

继承 AQS 的类应该是非公有的静态内部类,实现的同步器依赖 int state 值判断当前锁是否被占用,这个 state 值代表的含义由用户自己来决定,我们这里用 0 表示没有线程占用锁,1 表示有 1 个线程占用锁,x+1 表示某个线程第 x 次重入这把锁。此次自定义同步器仅支持独占模式。

外部类期望是实现了 Lock 接口,符合规范:

public class MyReentrantLock implements Serializable, Lock {
    private final Sync sync;
    public MyReentrantLock() {
        this.sync = new Sync();
    }
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            int c = getState();
            if (c == 0) { // 当前状态表示没有线程获取锁
                if (compareAndSetState(0, arg)) { // 获取锁的过程必须是原子的
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (Thread.currentThread() == getExclusiveOwnerThread()) {
                setState(c + arg); // 锁重入
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            // 如果拿到锁的线程和当前线程不同
            if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
            int c = getState() - arg;
            setState(c);
            if (c == 0) { // 释放锁,反之只是一次简单的重入退出
                setExclusiveOwnerThread(null);
                return true;
            }
            return false;
        }
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在《Java 并发编程的艺术》第 5 章,代码清单 5-2 给出的例子,其实就是来源于 AQS 类中第 199 行注释中给出的例子(此处仅限于 jdk1.8,其他的我没看过,不清楚),它在重写 tryRelease() 方法中,判断是否抛出 IllegalMonitorStateException 异常是 getState() == 0 ,这里其实是有问题的,他无法判断拿到锁的是否是当前线程,释放锁的操作是否是在拿到锁之后。

# 实现分析

# 内部属性

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {   
    // part1
    
    // 版本号
    private static final long serialVersionUID = 7373984972572414691L;    
    // 头节点
    private transient volatile Node head;    
    // 尾结点
    private transient volatile Node tail;    
    // 状态
    private volatile int state;    
    // 自旋时间
    static final long spinForTimeoutThreshold = 1000L;
    
    
    // part2
    // Unsafe 类实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //state 内存偏移地址
    private static final long stateOffset;
    //head 内存偏移地址
    private static final long headOffset;
    //state 内存偏移地址
    private static final long tailOffset;
    //tail 内存偏移地址
    private static final long waitStatusOffset;
    //next 内存偏移地址
    private static final long nextOffset;
    // 静态初始化块
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

part1 部分,内部要维护一个 FIFO 的队列,该队列其实是一个双向链表,所以 AQS 内部存在 head,tail 两个 Node 节点。 NodeAQS 的内部类, AQS 将一个线程封装为 Node 放入队列中。

然后就是 stateAQS 提供了三个方法供用户使用, getState(),setState(int),compareAndSetState(int)

part2 部分,因为 AQS 中有很多 CAS 操作,所以必须要提前拿到各个字段在其类中的地址偏移量

# Node 内部类

同步队列和等待队列都是使用 Node 作为节点,所以 Node 的主要属性为:

// 结点状态
    volatile int waitStatus;        
    // 前驱结点
    volatile Node prev;    
    // 后继结点
    volatile Node next;        
    // 结点所对应的线程
    volatile Thread thread;        
    // 下一个等待者
    Node nextWaiter;

其他重要的静态属性:

// 共享模式
    static final Node SHARED = new Node();
    // 静态变量,在独占模式中,t 线程拿到锁资源,EXCLUSIVE = t
    static final Node EXCLUSIVE = null;      
    // 结点状态
    // 值为 0,表示当前节点在 sync 队列中,等待着获取锁
	// 同步队列中阻塞的线程等待超时或者被中断,需要从同步队列中取消等待
    static final int CANCELLED =  1;
	// 当前节点的后继节点包含的线程需要被唤醒,也就是 unpark
	static final int SIGNAL    = -1;
	
	// 当前节点在等待 condition,也就是在 condition 队列 (等待队列) 中
    static final int CONDITION = -2;
	// 当前场景下后续的 acquireShared 能够得以执行
    static final int PROPAGATE = -3;

获取上一个节点的方法 predecessor() 和构造函数:

// 获取前驱结点,若前驱结点为空,抛出异常
    final Node predecessor() throws NullPointerException {
        // 保存前驱结点
        Node p = prev; 
        if (p == null) // 前驱结点为空,抛出异常
            throw new NullPointerException();
        else // 前驱结点不为空,返回
            return p;
    }
    
    // 无参构造方法
    Node() {}
    
    // 构造方法
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 构造方法
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

Node 中封装 Thread 属性的主要目的是,当获取锁的线程要释放锁,唤醒阻塞队列中的节点时,就需要用到被阻塞的线程。因为 AQS 本质上使用的是 LockSupport.park()/unpark(Thread t) 来实现阻塞和唤醒的,所以当需要唤醒队列中的节点时,只需要拿到节点中保存的线程,然后调用 LockSupport.unpark(t) 即可。

# 同步队列

先讲一下双向链表的入队机制,一开始, AQShead,tail 都是 null ,当加入第一个节点时( addWaiter 方法),会检查 tail==null ,如果是 null ,就说明队列为空,就会初始化队列,让 head,tail 指向 new Node ,此时头节点和尾节点指向同一个,没有实际意义的节点。然后再把要加入的节点加入到 head 后面,接着 tail 指向新加入的节点( enq 方法实现)。

// 暂时不用管 mode,addWaiter 是将当前线程加入同步队列
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) { // 尾节点不为空,尝试加入同步队列
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);// 队列没有初始化或者 CAS 失败,进入 enq 方法
    return node;
}
private Node enq(final Node node) {
    for (;;) {// 无限循环,直到节点通过 CAS 加入到队列中
        Node t = tail;
        if (t == null) { // 初始化队列
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 新增尾节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

总结一下就是: addWaiter 会先尝试一下将节点加入到同步队列中,如果失败,就会进入 enq 函数,不断循环直到节点通过 CAS 加入队列中。同时 enq 还负责初始化队列。

# acquire(int)

AQS 中队列是双向链表,一个线程只有在通过 acquire(int arg) 获取资源失败后,才会被包装成 Node 加入队列中,该队列是 FIFO 的。所以我们先看一下 acquire(int arg) 源码

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire 会先尝试获取资源,也就是 tryAcquire ,而这个正是我们需要重写的方法,以上面的自定义同步器重写的 tryAcquire 为例

protected boolean tryAcquire(int arg) {
        int c = getState();
        if (c == 0) { // 当前状态表示没有线程获取锁
            if (compareAndSetState(0, arg)) { // 获取锁的过程必须是原子的
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
        } else if (Thread.currentThread() == getExclusiveOwnerThread()) {
            setState(c + arg); // 锁重入
            return true;
        }
        return false;
    }

尝试获取锁失败, tryAcquire 就会返回 false ,然后就会调用 addWaiter(Node.EXCLUSIVE), arg) 方法,我们之前看源码,发现 addWaiter 传入的参数其实并不是当前线程封装的 Node ,参数名是 mode ,我们看一下官方注解:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {...}

acquire 中传入的是 Node.EXCLUSIVE ,就表示当前节点是独占模式的节点。为什么要这么区分呢?因为有些自定义同步器既可以独占模式,又可以共享模式,而 AQS 只会维护一个同步队列,那么两种模式下的节点都会在同一个队列中,所以需要区分。

如何将节点加入到队列中,最开始已经介绍过了。

当节点加入到队列中后, AQS 让该线程再做一次尝试获取锁资源。但是这种尝试是有条件的,因为 AQS 是严格的 FIFO ,所以只有当该线程在同步队列中前一个节点是头节点才允许再次尝试获取资源(头节点 head 指向一个没有任何意义的节点或者拿到锁的节点)。只有这样,才能是公平的

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//node 的前一个节点
                if (p == head && tryAcquire(arg)) {// 前一个节点是 head 并且获取资源成功
                    setHead(node);// 那么 node 就不应该在同步队列中了,设置 node 为 head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire 待会介绍,此处默认返回 true 即可
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里需要注意,如果代码执行到 return interrupted; ,那么该线程一定拿到了锁,该函数返回的是 interrupted ,表示线程在运行过程中,是否有其他线程为该线程打上过中断标记

这里回忆一下, LockSupport.park() 是会被中断标记打断的,但是不会清空中断标记,如果是在 park 之前被打上中断标记,则不会进入阻塞。

为了更加清晰演示,这里展示一段代码

public class Main {
    static Thread t;
    public static void main(String[] args) throws InterruptedException {
        t = new Thread(() -> {
            t.interrupt();
            System.out.println("park调用前,此时中断标记为:" + t.isInterrupted());
            LockSupport.park();
            LockSupport.park();
            System.out.println("park调用后,此时中断标记为:" + t.isInterrupted());
        });
        t.start();
        TimeUnit.SECONDS.sleep(2); // 其实这句都没必要
    }
}
// 结果:线程 t 不会出现阻塞  
//park 调用前,此时中断标记为:true
//park 调用后,此时中断标记为:true

这就出现了很严重的问题,如果我希望使用 park 阻塞线程,但线程之前就被打上了中断标记,那无论如何都无法阻塞它,除非将中断标记清空。但是 ** park 不会清空中断标记 **,只能依靠 Thread.interrupted() , 所以 parkAndCheckInterrupt() 的源码如下:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        // 如果之前被打上中断标记,这里 park 就会失败,然后清空中断标记
        return Thread.interrupted();
    }

关键点在于,这个中断标记需要恢复吗?

如果外界某线程并不知道线程 t 在阻塞队列中,而调用了 t.interrupt() ,就会导致 t 从阻塞中恢复或者无法 park ,线程 t 都会从从 parkAndCheckInterrupt() 中的 return 继续执行。因为中断标记被清空了,所以返回 true ,在 acquireQueued() 函数中就会执行 interrupted = true 。直到这个线程的节点终于移动到队首(前一个节点是 head ),然后被释放锁后的线程调用 LockSupport.unpark() 唤醒,在第一个 if 中拿到锁资源然后 return interrupted 。最终 acquire() 函数会根据返回值决定是否为当前线程打上中断标记。

为什么要这么大费周章,主要就是因为 park 会响应中断标记,而外界调用 t.interrupt() 的线程并不知道这个线程处于阻塞状态,它只是希望这个线程被打上中断标记并在一定时机根据这个标记停止某些工作,所以 AQS 理应为这种情况考虑,恢复希望的中断标记。

acquireQueued() 设计的绝妙之处就在于只有第一个 if 返回了,整个循环才会结束,而第二个 if 只是用来记录中断和 park 线程的。

现在我们再来看一下 shouldParkAfterFailedAcquire 函数,这个函数的主要作用就是,根据前一个节点的状态判断当前节点是否该被 park ,先不看代码,从逻辑上来说,同步队列中的节点有三种状态: signalcancelled0 ,如果说 node 前一个节点被取消了阻塞,也就是 cancelled ,那么 node 就应该此时将其从队列中删除。如果前一个节点状态是 signal ,那么 node 可以被阻塞,因为现在还轮不到 node 来竞争锁,如果发现前一个节点是 0 ,那么就应该将其设置为 signal

该函数的设计也十分巧妙,内部涉及到的并发考量其实非常多,但是这里不展开非常详细的探讨。

// 传入的前一个节点,当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    //pred 节点的状态为 SIGNAL,SIGNAL 表示该节点的后继节点 (即将) 被阻塞 (通过 park)
    if (ws == Node.SIGNAL)
        return true;
    
    // 状态为 CANCELLED,需要从队列中删除
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {// 为 PROPAGATE -3 或者是 0 表示无状态,(为 CONDITION -2 时,表示此节点在 condition queue 中) 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 不能进行 park 操作
    return false;
}

节点会一直跳过被取消的节点,直到遇到没有取消的节点,然后退出,在外部函数循环一遍再次进入该函数,修改其为 signal ,然后再次退出,直到进入该函数,发现前驱节点状态为 signal ,可以被 park 才会返回 true 从而使线程 park

# release(int)

以独占模式释放对象,先看源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 释放成功
        // 保存头节点
        Node h = head; 
        if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为 0
            unparkSuccessor(h); // 释放头节点的后继结点
        return true;
    }
    return false;
}

tryRelease 需要我们手动重写,在上面自定义同步器的例子中, release 在锁重入时会返回 false 。如果 tryRelease 成功,那么如果头节点不为空并且头节点的状态不为 0,则释放头节点的后继节点。

// 释放后继结点
private void unparkSuccessor(Node node) {
    // 获取 node 结点的等待状态
    int ws = node.waitStatus;
	// 状态值小于 0,为 SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
    if (ws < 0) 
        // 比较并且设置结点等待状态,设置为 0,这一步失败了也无所谓
        compareAndSetWaitStatus(node, ws, 0);
    // 获取 node 节点的下一个结点
    Node s = node.next;
    // 下一个结点为空或者下一个节点的等待状态大于 0,即为 CANCELLED
    if (s == null || s.waitStatus > 0) { 
        //s 赋值为空
        s = null; 
        // 从尾结点开始从后往前开始遍历
        for (Node t = tail; t != null && t != node; t = t.prev)
            // 找到等待状态小于等于 0 的结点,找到最前的状态小于等于 0 的结点
            if (t.waitStatus <= 0) 
                // 保存结点
                s = t;
    }
    if (s != null) // 该结点不为为空,释放许可
        LockSupport.unpark(s.thread);
}

# 等待队列

AQS 中等待队列和同步队列中都是 Node 节点,前一篇文章已经讲了 ConditionAQS 内部定义了 ConditionObject 来实现 Condition 接口,所以该部分直接将 ConditionObject 的相关函数。

# 等待

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程加入等待队列
    Node node = addConditionWaiter();
    
    // 释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    // 如果该节点还不存在与同步等待队列中,就阻塞掉自己  
  	// 这里这样判断的原因是因为 signal 方法会将条件等待队列中的相应节点转移到同步队列中。
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);// 完成进入等待队列吗,锁资源释放
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued 方法只会在成功获取到同步资源之后才会返回,返回就表明成功获取到同步资源
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

获取了锁的线程才能调用 await ,而获得了锁的线程在同步队列中就是首节点。并不会将首节点直接加入到等待队列,而是会重新构建一个新的 Node 加入进去。

又因为阻塞线程使用的还是 park ,所以要在循环中判断是否被唤醒,也就是检查当前线程节点是不是在同步队列中,如果不在,就是被 interrupt() 唤醒的。

# 唤醒

主要是 signalsignalAll ,先看源码:

public final void signal() {
    if (!isHeldExclusively()) // 只有获取锁,才能调用 signal 线程
        throw new IllegalMonitorStateException();
    
    Node first = firstWaiter;// 拿到等待队列的头节点
    if (first != null)
        doSignal(first);// 唤醒头节点
}
private void doSignal(Node first) {
    do {        
        if ( (firstWaiter = first.nextWaiter) == null)// 等待队列头节点向下移动
        	lastWaiter = null;
        first.nextWaiter = null; // 需要唤醒的节点移出等待队列
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// 此时 node 已经移出等待队列了,加入同步队列
final boolean transferForSignal(Node node) {
    // 这里如果 node 的状态是 CANCELLED,就会失败,node 也就自然而然从队列中删除,等待 GC
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    
	// 修改成功,加入同步队列,返回 node 的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    
    //ws > 0 是 CANCELLED
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    
    return true;
}

# 模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样 (模板方法模式很经典的一个应用)。 AQS 底层使用模板方法,使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放) 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

isHeldExclusively()// 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
tryAcquire(int)// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
tryRelease(int)// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
tryAcquireShared(int)// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。

这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。关于模板方法,可以参考这篇文章:行为型 - 模板方法

# 参考

本篇博客更多的都是我自己的思考和阅读源码的结果,下面的参考给了我指引作用。但是其实到现在,我对 AQS 还有很多的细节很模糊,比如线程释放锁后,想要唤醒一个线程,而即将唤醒时,该线程被设为 cancelled 会发生什么,之类的。

https://pdai.tech/md/java/thread/java-thread-x-lock-AbstractQueuedSynchronizer.html

https://blog.csdn.net/ziya_anan/article/details/122009215

https://www.cnblogs.com/txmfz/p/14755920.html

《Java 并发编程的艺术》第 5 章