1. 前言
说起 JUC,我们常常会想起其中的线程池(ExecutorService)。然而,我们今天来看看另一个核心模块 AQS。
AQS 是 AbstractQueuedSynchronizer 的简称,在 JUC 中作为各种同步器的基石。举个例子,常见的 ReentrantLock 就是由它实现的。
2. 如何实现一个锁?
我们知道,java 有一个关键字 synchronized 来给一段代码加锁,可是这是 JVM 层面的事情。那么问题来了,如何在 java 代码层面来实现模拟一个锁?
即实现这样一个接口:
1 | package java.util.concurrent.locks; |
自旋锁
一个简单的想法是:让所有线程去竞争一个变量owner,确保只有一个线程成功,并设置自己为owner,其他线程陷入死循环等待。这便是所谓的自旋锁。
一个简单的代码实现:
1 | import java.util.concurrent.atomic.AtomicReference; |
扯这个自旋锁,主要是为了引出 AQS 背后的算法 CLH锁
。
关于CLH锁
更多细节可以参考篇文章:
3. AQS 的实现
CLH锁的思想,简单的说就是:一群人去ATM取钱,头一个人拿到锁,在里面用银行卡取钱,其余的人在后面排队等待;前一个人取完钱出来,唤醒下一个人进去取钱。
关键部分翻译成代码就是:
- 排队 -> 队列
- 等待/唤醒 -> wait()/notify() 或者别的什么 api
3.1 同步队列
AQS 使用节点为 Node 的双向链表作为同步队列。拿到锁的线程可以继续执行代码,没拿到的线程就进入这个队列排队。
1 | public abstract class AbstractQueuedSynchronizer ... { |
这个队列大体上长这样:图片来源
条件队列是为了支持 Lock.newCondition() 这个功能,暂时不care,先跳过。
3.2 独占模式的 api
AQS 支持独占锁(Exclusive)和共享锁(Share)两种模式:
- 独占锁:只能被一个线程获取到 (ReentrantLock);
- 共享锁:可以被多个线程同时获取 (CountDownLatch、ReadWriteLock 的读锁)。
这边我们只看独占模式,它对外提供一套 api:
- acquire(int n):获取n个资源(锁)
- release(int n):释放n个资源(锁)
简单看一眼怎么用的 (ReentrantLock 的例子):
1 | public class ReentrantLock implements Lock, java.io.Serializable { |
可以看到,AQS 封装了排队、阻塞、唤醒之类的操作,使得实现一个锁变的如此简洁。
3.2.1 acquire(int)
获取资源
1 | public final void acquire(int arg) { |
这个函数很短,其中 tryAcquire(int) 为模板方法,留给子类实现。类似 Activity.onCreate()。
根据 tryAcquire(arg) 的结果,分两种情况:
- 返回 true: 该线程拿到锁,由于短路,直接跳出 if,该线程可以往下执行自己的业务代码。
- 返回 false: 该线程没有拿到锁,会继续走 acquireQueued(),执行排队等待逻辑。
3.2.1.1 addWaiter(Node)
这一步把当前线程(Thread.currentThread())作为一个Node节点,加入同步队列的尾部,并标记为独占模式。
当然,加入队列这个动作,要保证线程安全。
1 | private Node addWaiter(Node mode) { |
可以看到,这边有一个死循环 + CAS的神奇操作,这是非阻塞算法的经典操作,可自行查阅相关资料。简单的说,非阻塞算法就是在多线程的情况下,不加锁同时保证某个变量(本例中为双向链表)的线程安全,而且通常比 synchronized 的效率要高。
3.2.1.2 acquireQueued(Node,int)
这个函数主要做两件事:
- 查看prev的waitStatus,看是不是需要阻塞,需要的话阻塞该线程
- 排在队首的家伙调用了release(),会唤醒老二。老二尝试去获得锁,成功的话自己变成队首,跳出循环。
结合这张图来看,每次出队完需要确保 head 始终指向占用资源的线程:
1 | final boolean acquireQueued(final Node node, int arg) { |
这边的 interrupted 主要是保证这样一个功能。线程在排队的时候不响应中断,直到出来以后,如果等待的过程中被中断过,作为弥补,立即相应中断(即调用selfInterrupt())。
shouldParkAfterFailedAcquire()
查看prev的waitStatus,看是不是需要阻塞。可以预见的是,经过几次死循环,全部都会变成SIGNAL状态。之后全部陷入阻塞。
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
值得一提的是,阻塞和唤醒没有使用常说的 wait()/notify(),而是使用了 LockSupport.park()/unpark()。这应该是出于效率上的考虑。
3.2.2 release(int)
释放资源
1 | public final boolean release(int arg) { |
释放的逻辑比较简单。注意一点,对于 next 节点 unpark(),相当于在把 next 节点从 acquireQueued() 中的死循环中解放出来。
回到 ATM 的例子,相当于,他取完钱,轮到后一个人取钱了。这样逻辑全部都串起来了。
4. 总结
这样,顺着独占锁这条线,AQS 的独占模式就分析完了。其他还有用于实现闭锁的共享模式,用于实现 Condition 的条件队列就不展开了。
5. 参考
Java并发编程实战(chapter_4)(AQS源码分析)
《JAVA并发编程实践》