AQS原理解析

AQS原理解析文章目录 1 AQS 介绍 2 AQS 原理 3 源码分析 AQS 模版方法模式加锁 独占锁 释放锁 独占锁 加锁 共享锁 释放锁 共享锁 1 AQS 介绍 AQS 的全程为 AbstractQueu 这个类在 java util concurrent locks 包下 publicabstra

1. AQS介绍

AQS 的全程为 (AbstractQueuedSynchronizer)这个类在 java.util.concurrent.locks 包下

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable 

AQS 是一个提供用于实现阻塞锁和同步器框架,依靠 先入先出 (FIFO) 等待队列,该队列就是 CLH 同步队列,遵循 FIFO 原则 ,比如我们提到的 ReentrantLock ReentranSemaphoretLock ,其他的诸如 ReentrantReadWriteLock SynchronousQueue FutureTask(jdk1.7) , 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

2. AQS原理

AQS 核心思想,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及唤醒锁时分配机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到的锁加入到队列中。

CLH (Craig,Landin,and Hagersten)   队列是一个虚拟双向队列 (虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。原生的 CLH 队列适用于自旋锁, 但 Doug Lea 把其改成阻塞锁,AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配

在这里插入图片描述



AQS 使用来一个 int 成员变量来标识同步状态,通过内置但 FIFO 队列来完成获取资源线程但排队工作。

 private volatile int state; protected final int getState() { 
         return state; } protected final void setState(int newState) { 
         state = newState; } 

AQS 定义两种资源共享方式

  • Exclusive (独占)只有一个线程能执行,如 ReentrantLock
  • Share(共享)多个线程可以同时执行,如 Semaphore / CountDownLatCh

3. 源码分析

3.1 AQS 模版方法模式

同步器的设计是基于模版方法模式的,如果需要自定义同步器一般的方式

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法,对于 共享资源 state 的获取和释放
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模版方法,这些模版方法会调用使用者重写的方法

模版方法模式传送门 : 模版方法模式

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

sHeldExclusively()   //该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int)   //独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int)   //独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int)   //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int)   //共享方式。尝试释放资源,成功则返回true,失败则返回false。
























  以 ReentranLock 为例,state 初始化为 0 ,表示未锁状态,A线程 lock() 时,会调用 tryAcquire() 独占并将 state +1 。此后,其他线程再 tryAcquire() 时就会失败,直到A线程 unlock() 到 state = 0 (即释放锁) 为止,其他线程才有机会获取该锁。当然释放锁之前,A线程自己是可以重复获取此锁的 (state会累加),者就是可重入的概念,但要注意,获取多少次就要释放多少次,这样才能保证 state 能回到 0 的状态 。

  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

3.2 加锁 (独占锁 )

以 ReentranLock 为例 ReentranLock 锁 lock 方法源码 (非公平锁的实现 )

static final class NonfairSync extends Sync { 
               private static final long serialVersionUID = L; / * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { 
               // CAS 抢锁 修改 state值 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { 
               return nonfairTryAcquire(acquires); } } 

AQS内部 acquire() 方法

 public final void acquire(int arg) { 
               if (!tryAcquire(arg) // tryAcquire 由子类重写 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 
  • tryAcquire 方法由子类重写 ,tryAcquire方法先尝试获取独占锁,获取成功则返回 (非公平锁的体现)。
  • addWaiter 方法是 tryAcquire 返回 false 以后 ,也就是获取锁失败以后 ,就会把当前线程封装成Node,并添加到队列到尾部,并返回 Node 节点。
  • acquireQueued 负责把 addWaiter 返回的 Node 节点添加到队列结尾,并会执行获取锁操作以及判断是否把当前线程挂起。
  • selfInterrupt 是 AQS 中的 Thread.currentThread().interrupt() 方法调用,它的主要作用是执行完 acquire 之前自己执行中断操作

ReentranLock 子类重写 tryAcquire() 方法

 protected final boolean tryAcquire(int acquires) { 
               return nonfairTryAcquire(acquires); } 
final boolean nonfairTryAcquire(int acquires) { 
               final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { 
               if (compareAndSetState(0, acquires)) { 
               setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { 
               int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 
  • 如果 c == 0 说明没有线程正在竞争该锁,通过 CAS 方式获取锁,并返回 true
  • 如果 current == getExclusiveOwnerThread() 也就是当前线程持有锁,只是简单但 ++acquires,并修改 status值,进行锁重入,但因为没有竞争,所以通过 setStatus 修改。

AQS内部 addWaiter() 方法

 private Node addWaiter(Node mode) { 
               Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { 
               node.prev = pred; if (compareAndSetTail(pred, node)) { 
               pred.next = node; return node; } } enq(node); return node; } 
  • addWaiter 方法负责把当前无法获得锁的线程包装为一个 Node 添加到队尾 (因为先进先出,所以添加到队尾)
  • 如果当前队尾已经存在(tail != null), 则使用 CAS 把当前线程更新为 tail
  • CAS 不一定一定成功 ,因为在并发场景下,可能会出现操作失败, 则需要调用 enq 方法,该方法会自旋操作,把节点入队列

AQS内部 enq() 方法

private Node enq(final Node node) { 
               for (;;) { 
               Node t = tail; if (t == null) { 
               // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { 
               node.prev = t; if (compareAndSetTail(t, node)) { 
               t.next = node; return t; } } } } 
  • 自旋 for 循环 + CAS 入队列。
  • 当队列为空时,则会新创建一个节点,把尾节点指向头节点,然后继续循环。
  • 第二次循环时,则会把当前线程当节点添加到队尾。

AQS内部 acquireQueued () 方法

final boolean acquireQueued(final Node node, int arg) { 
               boolean failed = true; try { 
               boolean interrupted = false; for (;;) { 
               final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { 
               setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { 
               if (failed) cancelAcquire(node); } } 
  • 使用无限循环,如果 p == head && tryAcquire(arg) 条件不满足循环将永远无法结束,不会出现死循环,在于 parkAndCheckInterrupt() 方法会把当前线程挂起,从而阻塞住线程的调用栈。
private final boolean parkAndCheckInterrupt() { 
               LockSupport.park(this); return Thread.interrupted(); } 
  • LockSupport.park 最终把线程交给系统 (Linux) 内核进行阻塞。也不是马上把请求不到锁的线程进行阻塞,而是检查该线程的状态,具体的检查在 shouldParkAfterFailedAcquire 中

AQS内部状态 waitStatus

static final int CANCELLED = 1   // 取消排队,放弃获取锁

static final int SIGNAL = -1   // 表示后继结点在等待当前结点唤醒

static final int CONDITION = -2   // 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

static final int PROPAGATE = -3   共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。


















AQS内部 shouldParkAfterFailedAcquire() 方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
               int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { 
               /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { 
               node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { 
               /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 
  • 如果前继节点状态为 SIGNAL ,表明当前节点需要 unpark , 则成功返回, 此时 acquireQueued 的parkAndCheckInterrupt 方法会将线程阻塞
  • 如果前继节点的状态为 CANCELLED(ws>0), 说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false ,acquireQueued 方法的无限循环将递归调用该方法,直至 前继节点状态为 SIGNAL 返回 true,导致线程阻塞
  • 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完后通知自己一下。

3.3 释放锁 (独占锁 )

public void unlock() { 
                sync.release(1); } 

AQS内部 release()方法

public final boolean release(int arg) { 
                // tryRelease 判断释放释放资源 子类重写 if (tryRelease(arg)) { 
                Node h = head; // h.waitStatus != 0 表示候机节点在等待唤醒 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 
  • release() 方法,在独占模式下线程释放资源的顶层接口,他会释放指定量的资源

ReentranLock 子类重写 tryRelease() 方法

protected final boolean tryRelease(int releases) { 
                 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { 
                 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 
  • Thread.currentThread() != getExclusiveOwnerThread() 判断是否为当前线程在调用,不是抛出 IllegalMonitorStateException异常
  • 如果 c ==0 释放该锁,同时将当前锁持有线程设置为 null
  • setState© 设置 state 值

AQS内部 unparkSuccessor()

private void unparkSuccessor(Node node) { 
                 /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { 
                 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 
  • 如果 ws < 0 则利用 CAS 将当前节点清零
  • 如果 s == null || s.waitStatus > 0 获取后继线程
  • 如果 s != null 使用 LockSupport.unpark(s.thread) 唤醒后继线程

3.4 加锁 (共享锁)

以 Semaphore 为例 Semaphore 共享锁使用     传送门 : Semaphore信号量

 Semaphore semaphore = new Semaphore(2, false); public Semaphore(int permits) { 
                   sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { 
                   sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 

默认情况下只需要传入 permits 许可证数量即可,也就是一次允许放行几个线程,构造会通过 true / false 的方式实现公平锁,非公平锁

初始许可证数量 :

NonfairSync(int permits) { 
                    super(permits); } FairSync(int permits) { 
                    super(permits); } Sync(int permits) { 
                    setState(permits); } protected final void setState(int newState) { 
                    state = newState; } 

在构造函数初始化的时候,无论是公平锁还是非公平锁,都会设置 AQS 中 state 数量值。这个值也就是为了下文中可以获取的信号量扣减和增加的值。

acquire() 方法

Semaphore semaphore = new Semaphore(2,false); semaphore.acquire(); public void acquire() throws InterruptedException { 
                    // 调用 AQS内部的方法 sync.acquireSharedInterruptibly(1); } 

AQS内部 acquireSharedInterruptibly() 方法

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 
                    // 判断线程是否中止 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquireShared 方法实现如下 if (tryAcquireShared(arg) < 0) // doAcquireSharedInterruptibly 方法实现如下 doAcquireSharedInterruptibly(arg); } 

Semaphore 子类重写 tryAcquireShared 方法 (公平锁 )

static final class FairSync extends Sync { 
                    private static final long serialVersionUID = L; FairSync(int permits) { 
                    super(permits); } // 重写 AQS tryAcquireShared() 方法 protected int tryAcquireShared(int acquires) { 
                    for (;;) { 
                    if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } 
  • tryAcquireShared ,共享方式尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • hasQueuedPredecessors 公平锁的主要实现方法,它的目的就是判断有线程排在自己前面没,以及把线程添加到队列中逻辑实现。
  • for 自旋的过程,通过 CAS 来设置 state 偏移量对应值,这样就可以避免多线程下竞争获取信号量的冲突。
  • getState(),在构造函数中已经初始化 state 值,在这里获取信号量时就是使用 CAS 不断的扣减。

AQS内部 doAcquireSharedInterruptibly 方法实现

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { 
                    // SHARED 起到标记作用,用于判断是否共享 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { 
                    for (;;) { 
                    final Node p = node.predecessor(); if (p == head) { 
                    // tryAcquireShared 用来获取同步状态  int r = tryAcquireShared(arg); if (r >= 0) { 
                    setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { 
                    if (failed) cancelAcquire(node); } } 
  • 首先 doAcquireSharedInterruptibly 方法来自于 AQS 内部方法。
  • for 自旋 如果 p == head 在进行 tryAcquireShared 判断线程是否获取到锁。
  • 如果 r > 0   setHeadAndPropagate(node, r),同步成功后将当前线程设置为头节点,同时 helpGC ,p.next = null,断链操作。
  • shouldParkAfterFailedAcquire(p, node),调整同步队列中 node 结点的状态,并判断是否应该被挂起 。
  • parkAndCheckInterrupt(),判断是否需要被中断,如果中断直接抛出异常,当前结点请求也就结束。
  • cancelAcquire(node),取消该节点的线程请求。

3.5 释放锁 (共享锁)

public void release(int permits) { 
                     if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } 
  • 调用 release 释放资源

AQS内部 releaseShared 方法

public final boolean releaseShared(int arg) { 
                     if (tryReleaseShared(arg)) { 
                     doReleaseShared(); return true; } return false; } 
  • tryReleaseShared 尝试获取资源
protected final boolean tryReleaseShared(int releases) { 
                     for (;;) { 
                     int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } 
  • getState() 获取当前资源状态
  • 如果 next < current 为 是负数的话 , 则返回错误 否则通过 CAS 操作设置 state 变量

AQS内部 doReleaseShared 方法

private void doReleaseShared() { 
                     for (;;) { 
                     Node h = head; if (h != null && h != tail) { 
                     int ws = h.waitStatus; if (ws == Node.SIGNAL) { 
                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 
  • 如果 h != null && h != tail 头节点不为空并且头节点不是尾节点,则队列中有节点
  • h.waitStatus 判断当前 waitStatus 如果当前节点为 SIGNAL 将h节点的 waitStatus 设置成0失败 , 则 CAS 自旋设置直到成功
  • unparkSuccessor(h) 开始 unpark 前驱节点
  • 如果 ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 如果 waitStatus 为0的话 , 则 CAS 设置PROPAGATE 继续下次循环
  • 如果 h == head 头节点相同则break , 否则一直循环

个人博客地址:http://blog.yanxiaolong.cn | 『纵有疾风起,人生不言弃』


































版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/230120.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午3:05
下一篇 2026年3月16日 下午3:05


相关推荐

  • Camera mipi协议

    Camera mipi协议mipi协议介绍mipi,是MobileIndustryProcessorInterface缩写。mipi移动行业处理器接口。是mipi联盟发起的为移动应用处理器制定的开放标准。分别定义了一系列的手机内部接口标准,比如摄像头接口CSI、显示接口DSI,其中CSI(CameraSerialInterface)是由MIPI联盟下Camera工作组指定的接口标准。CSI-2是MIPICSI第二版,主要由应用层、协议层、物理层组成,最大支持4通道数据传输、单线传输速度高达1Gb/s。mipiCSI

    2022年6月4日
    48
  • lisp语言画阿基米德线_120种UG表达式曲线画法(阿基米德螺旋线、数学方程式)…

    lisp语言画阿基米德线_120种UG表达式曲线画法(阿基米德螺旋线、数学方程式)…在 UG 中利用 规律曲线 根据方程 绘制各种方程曲线 1 极坐标 或柱坐标 r z 与直角坐标系 x y z 的转换关系 x r cos y r sin z z2 球坐标系 r 与直角坐标系 x y z 的转换关系 x rsin cos y rsin sin z rcos 在 UG 表达式中输入的 theta phi r rho 注 所有 UG 表达式中 必须先在名称栏输

    2026年3月17日
    1
  • 【java】详解native方法的使用

    【java】详解native方法的使用目录结构 关于 native 关键字使用 native 关键字使用步骤案例编写 java 文件编译 java 文件获得 h 文件编写 hello cpp 文件部署 hello dll 文件运行 HelloWorld class 文件参考文章 1 关于 native 关键字想必读者已经了解过 native 关键字了 这里笔者就大致囊括一下 被 native 关键字修饰的方法叫做本地方法 本地方法和其它方法不一样 本地方法意味着和平台有关

    2026年3月16日
    3
  • C语言的字符串分割

    说起来很有意思,自认为对C语言理解得还是比较深刻的。但居然到今天才知道有个strtok函数,试用了一下突然感慨以前做了多少重复劳动。每次需要解析配置文件,每次需要分割字符串,居然都是自己去分割字符串,

    2021年12月24日
    41
  • 7-9 判断素数 (20分) 本题的目标很简单,就是判断一个给定的正整数是否素数。[通俗易懂]

    7-9 判断素数 (20分) 本题的目标很简单,就是判断一个给定的正整数是否素数。[通俗易懂]7-9 判断素数 (20分) 本题的目标很简单,就是判断一个给定的正整数是否素数。输入格式: 输入在第一行给出一个正整数N(≤ 10),随后N行,每行给出一个小于2 ​31 ​​ 的需要判断的正整数。输出格式: 对每个需要判断的正整数,如果它是素数,则在一行中输出Yes,否则输出No。输入样例: 2 11 111 输出样例: Yes No#include &l…

    2022年8月18日
    6
  • 命令行卸载java_卸载java「建议收藏」

    命令行卸载java_卸载java「建议收藏」有小伙伴经常会遇到Java没有卸载干净的情况,造成重新安装JDK能正常安装,接着安装JRE的时候总是报1603错误。虽然说JRE安装报错了没安装上,但是eclipse、IntelliJIDEA和AndroidStudio都能正常打开和使用,然而在命令行里却无法使用。小编今天和大家分享一下怎样彻底的卸载java,有需要的小伙伴不妨接着往下看。方法一:直接卸载,步骤比较繁琐,但是也能彻底卸载干净。1…

    2022年5月19日
    58

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号