1.AQS简单介绍
Sync是ReentrantLock的一个内部类,它继承了AbstractQueuedSynchronizer,即AQS,在CountDownLatch、FutureTask、Semaphore、ReentrantLock等源码中,我们都能看到它们的身影,足见其重要性。此处我们需要先了解下AQS才能更愉悦地阅读源码。
AQS中是基于FIFO队列的实现,那么它必然包含队列中元素的定义,在这里它是Node:
属 性 | 定 义 |
---|---|
Node SHARED = new Node() | 表示Node处于共享模式 |
Node EXCLUSIVE = null | 表示Node处于独占模式 |
int CANCELLED = 1 | 因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,被GC回收 |
int SIGNAL = -1 | 表示这个Node的继任Node被阻塞了,到时需要通知它 |
int CONDITION = -2 | 表示这个Node在条件队列中,因为等待某个条件而被阻塞 |
int PROPAGATE = -3 | 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播 |
int waitStatus | 0,新Node会处于这种状态 |
Node prev | 队列中某个Node的前驱Node |
Node next | 队列中某个Node的后继Node |
Thread thread | 这个Node持有的线程,表示等待锁的线程 |
Node nextWaiter | 表示下一个等待condition的Node |
AQS中包含的方法有
属性/方法 | 含 义 |
---|---|
Thread exclusiveOwnerThread | 这个是AQS父类AbstractOwnableSynchronizer的属性,表示独占模式同步器的当前拥有者 |
Node | 上面已经介绍过了,FIFO队列的基本单位 |
Node head | FIFO队列中的头Node |
Node tail | FIFO队列中的尾Node |
int state | 同步状态,0表示未锁 |
int getState() | 获取同步状态 |
setState(int newState) | 设置同步状态 |
boolean compareAndSetState(int expect, int update) | 利用CAS进行State的设置 |
long spinForTimeoutThreshold = 1000L | 线程自旋等待的时间 |
Node enq(final Node node) | 插入一个Node到FIFO队列中 |
Node addWaiter(Node mode) | 为当前线程和指定模式创建并扩充一个等待队列 |
void setHead(Node node) | 设置队列的头Node |
void unparkSuccessor(Node node) | 如果存在的话,唤起Node持有的线程 |
void doReleaseShared() | 共享模式下做释放锁的动作 |
void cancelAcquire(Node node) | 取消正在进行的Node获取锁的尝试 |
boolean shouldParkAfterFailedAcquire(Node pred, Node node) | 在尝试获取锁失败后是否应该禁用当前线程并等待 |
void selfInterrupt() | 中断当前线程本身 |
boolean parkAndCheckInterrupt() | 禁用当前线程进入等待状态并中断线程本身 |
boolean acquireQueued(final Node node, int arg) | 队列中的线程获取锁 |
tryAcquire(int arg) | 尝试获得锁(由AQS的子类实现它) |
tryRelease(int arg) | 尝试释放锁(由AQS的子类实现它) |
isHeldExclusively() | 是否独自持有锁 |
acquire(int arg) | 获取锁 |
release(int arg) | 释放锁 |
compareAndSetHead(Node update) | 利用CAS设置头Node |
compareAndSetTail(Node expect, Node update) | 利用CAS设置尾Node |
compareAndSetWaitStatus(Node node, int expect, int update) | 利用CAS设置某个Node中的等待状态 |
另外在源码中多处使用了CAS,有关CAS的内容,可查看:
2.非公平锁的获取过程
假设有两个线程:线程1和线程2尝试获取同一个锁(非公平锁),过程如下
- 线程1调用lock方法
final void lock() { if (compareAndSetState(0, 1)) //使用CAS将同步状态设置为1 setExclusiveOwnerThread(Thread.currentThread());//成功则设置线程1为当前锁的独占线程 else acquire(1); //设置失败时,尝试获取锁 }//上述代码正常情况下执行完毕后,线程1成为了独占线程。
- 线程2此时也尝试获取锁,调用lock方法,此时CAS设置时会失败,进入acquire方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //重新获取锁失败且线程发生了中断,自行中断}
- 这里面,会首先调用tryAcquire方法尝试再次获取锁,因为我们演示的是非公平锁,因此调用的方法是nonfairTryAcquire。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //current指向当前线程2 int c = getState(); //若线程1未释放锁,则c>0,若线程1已经释放锁,则c=0 if (c == 0) { //线程1已经释放了锁 if (compareAndSetState(0, acquires)) { //使用CAS将state设置为1 setExclusiveOwnerThread(current); //并设置线程2为独占线程 return true; //返回true,获取锁成功 } } //判断该线程是否是重入,即之前已经获取到了锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //每重入一次,将state+1。 if (nextc < 0) // overflow //state+1<0,说明原state为负数,抛出异常 throw new Error("Maximum lock count exceeded"); setState(nextc); //设置state为新值 return true; //返回true,获取重入锁成功。 } return false; //返回flase,获取锁失败 }
- 此时线程2使用tryAcquire方法获取锁,如果也是失败,那么,会调用addWaiter(Node.EXCLUSIVE)方法
private Node addWaiter(Node mode) { //此处mode为独占模式 Node node = new Node(Thread.currentThread(), mode);//将当前线程(此处为线程2)绑定到新节点node上,并设置为独占模式 // Try the fast path of enq; backup to full enq on failure Node pred = tail; //获取原队列的尾节点pred if (pred != null) { //若原尾节点pred非空,则说明已经存在一个队列 node.prev = pred; //设置新节点node的前置为pred if (compareAndSetTail(pred, node)) {//使用CAS设置新的尾节点为node pred.next = node; //设置pred的后置为node,建立双向链接 return node; //返回node } } enq(node); //进入此处说明原队列不存在,需要初始化队列 return node;}
private Node enq(final Node node) { //此处传入的参数node是绑定了线程2的节点 for (;;) { Node t = tail; //获取原队列的尾节点t if (t == null) { // Must initialize //若尾节点为空,说明队列尚未形成 if (compareAndSetHead(new Node())) //设置一个空的,未绑定任何线程的节点为新队列的头节点 tail = head; //新队列只有一个节点,既是头也是尾 } else { //若t非空,说明队列已经形成 node.prev = t; //将node的前置设为t if (compareAndSetTail(t, node)) { //CAS设置新的尾节点为node t.next = node; //设置t的后继为node,建立双向链接 return t; //返回t } } }}
- 现在看外层方法acquireQueued,此时传入的参数node是线程2所在节点,该方法的作用是在等待队列中,当有其他线程释放了资源,那么队列中在等待的线程就可以开始行动
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //是否获取到资源 try { boolean interrupted = false; //等待过程中是否被中断 //自旋,维护等待队列中线程的执行。 for (;;) { final Node p = node.predecessor(); //获取node的前置p if (p == head && tryAcquire(arg)) { //若前置p为头结点并且重新获取锁成功 setHead(node); //设置新的头节点为node p.next = null; // help GC //取消p和链表的链接 failed = false; //获取资源未失败 return interrupted; //等待过程未被中断 } if (shouldParkAfterFailedAcquire(p, node) && //若前置节点是Node.SIGNAL状态 parkAndCheckInterrupt()) //将节点设置为Waitting状态 interrupted = true; //此时线程中断状态为true } } finally { if (failed) //如果获取资源成功那么取消获取过程 cancelAcquire(node); }}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //获取前置节点的等待状态 if (ws == Node.SIGNAL) //Node.SIGNAL表示继任者线程需要被唤醒,那么就可以直接返回; /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //若ws>0,说明前驱被取消,那么执行循环往前一直查找,知道找到未被取消的,将node排在它的后面。 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //进入else,说明ws=0或者Node.PROPAGATE /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //使用CAS设置前置的节点状态为Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
该部分代码可以用现实中排队办理业务的情况来说明:
假设你排队去办理业务,队伍很长,因此除了当前正在办理业务的人,其他所有排队的人都在低头玩手机,且每个排队的人有以下三种状态:①.正常排队,且办完业务后会通知后面的人别玩手机了可以开始办理业务了。②.发现队伍过长,不排队了,走了。③.正常排队,办完业务后不通知后面的人,直接走。 此时你进入该队伍的尾部开始排队。1.第一步,判断排队在你前面的人是否会通知你,如果会通知,那么我们就可以不用关心其他问题,在队列中待着玩手机即可。2.第二步,如果发现排在你前面的人不排队了,要走了,那么此时我们就得往前走一位,并开始不断询问前面的人是不是也准备不排队了,直到我们排在了一个确定不会走的人后面。3.第三步,排在你前面的人不是准备走的,但是他也不会通知你,那么你就要告诉他,一定得在办完业务后通知你。
当我们确定我们已经在队列中待好后(前置会通知我们),那么我们就可以开始休息。parkAndCheckInterrupt方法让我们的线程进入等待的状态,即休息状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //调用park()使线程进入waiting状态 return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的。 }
3.锁的释放过程
public void unlock() { sync.release(1);}
public final boolean release(int arg) { if (tryRelease(arg)) { //若tryRelease后无人占用锁 Node h = head; //获取队列的头结点h if (h != null && h.waitStatus != 0) //若h非空,且h的waitStatus不为0 unparkSuccessor(h); //唤醒后继 return true; } return false;}
protected final boolean tryRelease(int releases) { int c = getState() - releases; //当前state-1,得到c if (Thread.currentThread() != getExclusiveOwnerThread()) //执行releas的不是获取锁的独占线程,抛出异常 throw new IllegalMonitorStateException(); boolean free = false; //free用来标记锁是否可获取状态 if (c == 0) { //若state=0 free = true; //那么当前锁是可获取的 setExclusiveOwnerThread(null); //设置当前锁的独占线程为null } setState(c); //设置当前state为c return free; //返回锁是否是可获取状态 }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //获取当前线程对应节点的waitStatus if (ws < 0) //将当前线程对应节点waitStatus置为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //获取当前线程对应节点的后继节点s if (s == null || s.waitStatus > 0) { //若s为空或s的状态是canceled s = null; //将s设置为null。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //此处从尾到头进行遍历,找到队列最前列的节点且状态不是Canceled,将其设置为s。但此处为何从尾部开始遍历尚未弄清楚。 s = t; } if (s != null) //若上述遍历找到的s非空 LockSupport.unpark(s.thread); //调用lockSupport.unpark唤醒s对应的线程 }
release方法的逻辑仍然可以用一个办理完业务的人的后续动作来进行说明:
1.若A办理业务后无其他业务需要办理,那么表示当前业务窗口是free的。2.A将自己的等待状态置为0,相当于退出队列。然后检查自己后面的人是否是空或者取消排队的状态。若为真,将后置设为空。3.从队列的尾部遍历到头部,直到找到队列最前头的那个,且它的等待状态不是取消状态,那么将其唤醒,告知他可以开始办理业务了。
4.关于源码的一点疑问
本文中部分源码本人暂时也尚未能理解,希望各位大佬不吝赐教,主要有以下一些问题:
1.在unparkSuccessor方法中,找到队列下一个节点并将其唤醒时,为什么从尾到头遍历if (s == null || s.waitStatus > 0) { //若s为空或s的状态是canceled s = null; //将s设置为null。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //倒序遍历? s = t; }
2.在acquireQueued方法中,自旋结束后的finally代码块的作用。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //是否获取到资源 try { boolean interrupted = false; //等待过程中是否被中断 //自旋,维护等待队列中线程的执行。 for (;;) { final Node p = node.predecessor(); //获取node的前置p if (p == head && tryAcquire(arg)) { //若前置p为头结点并且重新获取锁成功 setHead(node); //设置新的头节点为node p.next = null; // help GC //取消p和链表的链接 failed = false; //获取资源未失败 return interrupted; //等待过程未被中断 } if (shouldParkAfterFailedAcquire(p, node) && //若前置节点是Node.SIGNAL状态 parkAndCheckInterrupt()) //将节点设置为Waitting状态 interrupted = true; //此时线程中断状态为true } } finally { if (failed) //如果自旋结束,那么说明failed = false已经执行了,那么这个canclAcquire方法什么情况下会执行? cancelAcquire(node); }}