1. Введение
AQS является важным классом в среде JUC. Он реализует эксклюзивные блокировки и общие блокировки. Многие внутренние классы реализованы через AQS, например CountDownLatch, ReentrantLock, ReentrantReadWriteLock и Semaphore. В этой главе анализируется исходный код AbstractQueuedSynchronizer. Исходный код анализируется на основе получения и освобождения эксклюзивных блокировок, получения и освобождения общих блокировок, а также ожидания и сигнала Condition. Сначала ознакомьтесь с этими моментами внимания AQS 1. Переменные условия поддерживаются только в состоянии монопольной блокировки 2. AQS поддерживает очереди синхронизации и очереди условий (количество очередей условий зависит от количества экземпляров условия), только потоки узла в состоянии монопольной блокировки. очередь синхронизации может получать блокировки 3. AQS — это абстрактный класс, предоставляющий общедоступные методы для монопольных и общих блокировок.Существует несколько шаблонных методов, которые необходимо реализовать в подклассах: ①tryAcquire②tryRelease③tryAcquireShared④tryReleaseShared⑤isHeldExclusively. Эти моменты будут проанализированы в следующем анализе кода.Существует также класс AbstractQueuedLongSynchronizer, который имеет почти те же функции и реализацию, что и AQS. Единственное отличие состоит в том, что тип состояния атрибута, представляющий количество раз, когда блокировка была получена в AQLS, имеет тип long, а переменная-член в AQS имеет тип int. АКСВерсия jdk, соответствующая исходному коду, — 1.8.
Во-вторых, классовые отношения.
//此类只提供保存和获取独占锁线程,子类可以使用适当的保留值来帮助控制和监控访问并提供诊断
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
//独占锁拥有者线程
private transient Thread exclusiveOwnerThread;
//设置独占锁的拥有者线程,访问权限protected,同包或者子类使用
protected final void setExclusiveOwnerThread(Thread thread) {
//将传入进来的线程赋值给独占锁拥有者线程
exclusiveOwnerThread = thread;
}
//获取独自锁的拥有者线程,访问权限protected,只能在同包或者子类使用
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
3. Свойства
//同步队列的头节点
private transient volatile Node head;
//同步队列的尾节点
private transient volatile Node tail;
//同步状态
private volatile int state;
//如果超时时间小于此阈值,不阻塞线程,让其自旋,在doAcquireNanos、doAcquireSharedNanos、awaitNanos、await(long time, TimeUnit unit)方法使用到
static final long spinForTimeoutThreshold = 1000L;
//获取UnSafe使用,如果对UnSafe使用不清楚的,可以看下我分享的UnSafe的使用
private static final Unsafe unsafe = Unsafe.getUnsafe();
//属性state的相对偏移量,相对AbstractQueuedSynchronizer实例的起始内存位置的相对偏移量,定义成静态的原因是,属性的相对实例的偏移量都是相等的
private static final long stateOffset;
//属性head的相对偏移量
private static final long headOffset;
//属性tail的相对偏移量
private static final long tailOffset;
//内部类Node实例的属性waitStatus的相对偏移量
private static final long waitStatusOffset;
//内部类Node实例的属性next的相对偏移量
private static final long nextOffset;
static {
try {
//使用UnSafe实例获取AbstractQueuedSynchronizer类的属性state的相对偏移量
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
//使用UnSafe实例获取AbstractQueuedSynchronizer类的属性head的相对偏移量
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
//使用UnSafe实例获取AbstractQueuedSynchronizer类的属性tail的相对偏移量
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
//使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性waitStatus的相对偏移量
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
//使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性next的相对偏移量
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
В-четвертых, внутренний класс
- Класс узла
//通过Node我们可以实现两个队列,一是通过prev和next属性实现CLH队列(线程同步队列,双向队列),二是nextWaiter属性实现Condition条件上的等待条件队列(单向队列),在Condition中会详细介绍。 static final class Node { //当前节点是获取共享锁的标记 static final Node SHARED = new Node(); //当前节点是获取独占锁的标记 static final Node EXCLUSIVE = null; //属性waitStatus的值,标志节点对应的线程被取消 static final int CANCELLED = 1; //属性waitStatus的值,标志当前节点的next节点的线程(即队列中当前节点的下一个节点)需要被阻塞 static final int SIGNAL = -1; //属性waitStatus的值,标志当前节点在Condition条件下等待阻塞,在Condition实例的await系列方法中使用,新建一个waitStatus的值为CONDITION的节点Node,将其加入到Condition中的条件队列中,在Condition实现类详细介绍 static final int CONDITION = -2; //属性waitStatus的值,标志着下一个acquireShared方法线程应该被允许,在获取共享锁 static final int PROPAGATE = -3; //标记着当前节点的状态,默认状态是0,小于0的状态值都是有特殊作用,大于0的状态值表示已取消 volatile int waitStatus; //使用prev和next实现同步队列,即双向链表,当前节点的前驱节点 volatile Node prev; //当前节点的下一节点 volatile Node next; //当前节点对应的线程 volatile Thread thread; //有两种作用:1、表示下一个在Condition条件上等待的节点,调用Condition中await和signal方法,当前节点的线程是拥有独占锁的线程2、表示同步队列中的节点是共享模式还是独占模式 Node nextWaiter; //判断当前节点是不是共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取当前节点的前驱节点,如果为null,则抛出空指针异常 final Node predecessor() throws NullPointerException { //当前节点的前驱节点 Node p = prev; //如果前驱节点为空 if (p == null) //抛出空指针异常 throw new NullPointerException(); else //返回当前节点的前驱节点 return p; } //在创建链表头head,或者创建节点共享锁标记属性SHARED值 Node() { } //在addWaiter方法中使用 Node(Thread thread, Node mode) { //当前节点的模式,是属于共享模式,还是独占模式 this.nextWaiter = mode; //将传入进来的线程赋值给节点属性thread this.thread = thread; } //在Condition条件中使用 Node(Thread thread, int waitStatus) { //将传入节点的状态值赋值给节点属性waitStatus this.waitStatus = waitStatus; //将传入进来的线程赋值给节点属性thread this.thread = thread; } }
- Класс ConditionObject
public interface Condition { //以下方法都会详细在ConditionObject实现类中介绍 void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); } public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; //当前Condition实例的条件队列的第一个节点 private transient Node firstWaiter; //当前Condition实例的条件队列的最后一个节点 private transient Node lastWaiter; //构造函数 public ConditionObject() { } //如果尾节点的状态值不是CONDITION值,清除条件队列中取消的节点,修改条件队列的第一个节点和最后一个节点,创建新节点,将其新节点做为条件队列的尾节点 private Node addConditionWaiter() { //获取条件队列的最后一个节点 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果条件队列中的最后一个节点不为空,并且状态值不是CONDITION,即最后等待节点已取消 if (t != null && t.waitStatus != Node.CONDITION) { //移除条件队列中的已取消的节点,重新设置firstWaiter、lastWaiter节点,详细的可以看下面对unlinkCancelledWaiters方法的介绍 unlinkCancelledWaiters(); //重新获取条件队列的最后一个节点 t = lastWaiter; } //创建新节点,传入当前线程和节点的状态值Node.CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果尾节点为空 if (t == null) //将其新节点做为条件队列的第一个节点 firstWaiter = node; else //将条件队列中的上一个尾节点 t.nextWaiter = node; //将其新建的节点做为条件队列的最后一个节点 lastWaiter = node; //返回新建的节点 return node; } //只将一个条件队列中不为空、状态值为CONDITION的第一个节点加入到同步队列中,传入条件队列的头结点,将其移动同步队列中,如果传入的节点已取消,会将其传入节点的下一个不为空、状态值为CONDITION的节点移到同步队列中 private void doSignal(Node first) { do { //传入的节点的下一个等待节点为空 if ( (firstWaiter = first.nextWaiter) == null) //将其条件队列的尾节点设置为空 lastWaiter = null; //加快gc回收,将其传入节点的下一节点置为空 first.nextWaiter = null; //可以看下面对transferForSignal方法的详细介绍,将其传入节点(状态值为CONDITION)移到同步队列中,如果当前加入同步队列节点的前置节点状态为已取消,或者将其前置节点的状态值设置为SIGNAL失败,唤醒加入的节点对应的线程 } while (!transferForSignal(first) && //找到条件队列中节点不为空的并且状态值为CONDITION的节点加入到同步队列中 (first = firstWaiter) != null); } //将其条件队列中的所有不为空、状态值为CONDITION的节点加入到同步队列中,从传入条件队列的头节点开始 private void doSignalAll(Node first) { //将其条件队列的头尾节点都置为空 lastWaiter = firstWaiter = null; //从头结点开始循环获取条件队列中的所有节点 do { //获取头结点的下一节点 Node next = first.nextWaiter; //加快gc,将其头节点的关联的下一节点置为空 first.nextWaiter = null; //可以看下面对transferForSignal方法的详细介绍,将其传入节点(状态值为CONDITION)移到同步队列中,如果当前加入同步队列节点的前置节点状态为已取消,或者将其前置节点的状态值设置为SIGNAL失败,唤醒加入的节点对应的线程 transferForSignal(first); //将下一节点设置为头结点 first = next; } while (first != null);//直到节点为空,即条件队列结束 } //将已取消的节点从条件队列中移除 private void unlinkCancelledWaiters() { //获取条件队列的头节点 Node t = firstWaiter; //临时变量,存放最新的不为空、状态值为Node.CONDITION的节点,目的是为了设置下一节点不为空、状态值为Node.CONDITION的节点 Node trail = null; //头结点不为空 while (t != null) { //获取头节点的下一节点 Node next = t.nextWaiter; //如果头节点状态值不是Node.CONDITION,此节点已取消 if (t.waitStatus != Node.CONDITION) { //加快gc,此已取消节点 t.nextWaiter = null; //如果最新的不为空、状态值为CONDITION的节点为空 if (trail == null) //将下一节点设置为头结点 firstWaiter = next; else //将最新的不为空、状态值为CONTIDION的节点和下一节点连起来 trail.nextWaiter = next; //如果下一节点为空,表明条件队列已经没有节点 if (next == null) //将最新的不为空、状态值为CONTIDION的节点赋值给尾节点 lastWaiter = trail; } else //将t赋值给trail trail = t; 将下一节点做为头节点 t = next; } } // public methods //将条件队列中的不为空、状态值为CONDITION头节点移动到同步队列中 public final void signal() { //调用signal方法的线程,不是拥有排他锁的线程就会抛出IllegalMonitorStateException异常,Condition的使用只支持在排他锁下使用 if (!isHeldExclusively()) //isHeldExclusively方法子类得重写,否则直接抛出UnsupportedOperationException throw new IllegalMonitorStateException(); //获取条件队列的头结点 Node first = firstWaiter; //如果条件队列不为空 if (first != null) //调用上面介绍的doSignal方法,详细的可以看上面的介绍,只将一个条件队列中不为空、状态值为CONDITION的第一个节点加入到同步队列中,传入条件队列的头结点,将其移动同步队列中,如果传入的节点已取消,会将其传入节点的下一个不为空、状态值为CONDITION的节点移到同步队列中 doSignal(first); } //将条件队列中的所有不为空,状态值为CONDITION的节点移动到同步队列中 public final void signalAll() { /调用signalAll方法的线程,不是拥有排他锁的线程就会IllegalMonitorStateException异常,Condition的使用只支持在排他锁下使用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //获取条件队列中的头节点 Node first = firstWaiter; //如果头节点不为空,即条件队列中存在节点 if (first != null) //调用上面介绍的doSignalAll方法,doSignalAll方法的详细介绍可以看上面对此方法的介绍,将其条件队列中的所有不为空、状态值为CONDITION的节点加入到同步队列中,从传入条件队列的头节点开始 doSignalAll(first); } //新建个节点加入到同步队列中,并释放锁,让当前线程阻塞等待。如果线程等待期间发出中断请求,不会产生中断异常 public final void awaitUninterruptibly() { //为当前线程创建新的Node节点,并且将此节点加入到条件队列中,addConditionWaiter方法可以看上面对此方法的介绍 Node node = addConditionWaiter(); //释放当前线程占有的锁,并唤醒其他线程,fullRelease方法会在下面详细介绍到 int savedState = fullyRelease(node); //初始化中断标志位为false boolean interrupted = false; //循环判断当前新建的节点是否在同步队列中,直到此节点在同步队列中退出循环,否则就阻塞当前线程,isOnSyncQueue方法会在下面进行介绍 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //如果当前线程被中断 if (Thread.interrupted()) //将中断标志位设置为true interrupted = true; } //节点已经在同步队列中,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待,acquireQueued方法返回当前线程是否被中断,此方法会在下面详细进行介绍 if (acquireQueued(node, savedState) || interrupted) //acquireQueued方法返回true,即当前线程被中断过,或者interruped标志位为true调用当前线程的interrupt方法 selfInterrupt();//Thread.currentThread().interrupt(); } //重新中断的标志位 private static final int REINTERRUPT = 1; //抛出中断异常的标志位 private static final int THROW_IE = -1; //检查传入节点在等待时,传入节点对应的线程是否有发生被中断请求 //如果其他线程没有调用传入节点线程的interrupt方法,即中断请求,返回0 //如果此节点对应的线程有被中断请求,并且中断请求在signalled(即调用signal或signalAll)之前返回THROW_IE ,因为在调用await方法时,会新建一个节点(状态值为CONDITION)加入到条件队列中,比如调用await方法传入超时时间,阻塞线程在超时自动被唤醒,此时节点还没有被signal,但是有其他线程调用此节点对应的线程interrupt方法 //如果中断请求是在singal、signalAll方法之后,返回重新中断的标志位 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? //transferAfterCanceledWait方法,看下面的详细介绍 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //如果传入进来的参数interruptMode是THROW_IE,就抛出InterruptedException异常,如果interruptMode是REINTERRUPT,则当前线程再次调用interrupt方法,再次发出中断请求,否则就什么都不做 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //如果interruptMode是THROW_IE if (interruptMode == THROW_IE) //抛出InterruptedException异常 throw new InterruptedException(); //如果interruptMode是REINTERRUPT,重新调用当前线程的interrupt方法,再次发出中断请求 else if (interruptMode == REINTERRUPT) selfInterrupt(); } //新建个节点加入到同步队列中,并释放锁,让当前线程阻塞等待。如果线程等待期间发出中断请求,则抛出中断异常 public final void await() throws InterruptedException { //如果当前线程已被中断 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //为当前线程创建新的Node节点,并且将此节点加入到条件队列中,addConditionWaiter方法可以看上面对此方法的介绍 Node node = addConditionWaiter(); //释放当前线程占有的锁,并唤醒其他线程,fullRelease方法会在下面详细介绍到 int savedState = fullyRelease(node); //线程中断请求是在signal、signalAll方法将此节点加入到同步队列之前还是之后的标志位 int interruptMode = 0; //循环判断当前新建的节点是否在同步队列中,直到此节点在同步队列中退出循环,否则就阻塞当前线程,isOnSyncQueue方法会在下面进行介绍 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //如果当前线程产生中断请求,就跳出循环, checkInterruptWhileWaiting方法可以看上面对此方法的介绍 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //节点已经在同步队列中,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待,acquireQueued方法返回当前线程是否被中断,此方法会在下面详细进行介绍 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //如果当前节点的下一个节点不为空 if (node.nextWaiter != null) // clean up if cancelled //将已取消的节点从条件队列中移除可以看上面对unlinkCancelledWaiters方法介绍 unlinkCancelledWaiters(); //是否要抛出中断异常,或者发出中断请求 if (interruptMode != 0) //可以看上面对reportInterruptAfterWait方法的介绍 reportInterruptAfterWait(interruptMode); } //@param nanpsTimeout超时时间,单位为纳秒 //@return 死亡时间减去当前时间的时间戳 //新建个节点加入到同步队列中,并释放锁,让当前线程阻塞等待,直到被调用signal、signalAll方法,或者超时将当前节点从条件队列中移到同步队列中,在同步队列中阻塞得到获取锁。如果线程等待期间发出中断请求,则抛出中断异常 public final long awaitNanos(long nanosTimeout) throws InterruptedException { //如果线程已被中断 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //为当前线程创建新的Node节点,并且将此节点加入到条件队列中,addConditionWaiter方法可以看上面对此方法的介绍 Node node = addConditionWaiter(); //释放当前线程占有的锁,并唤醒其他线程,fullRelease方法会在下面详细介绍到 int savedState = fullyRelease(node); //获取死亡时间,即当前时间加上传入的超时时间 final long deadline = System.nanoTime() + nanosTimeout; //线程中断请求是在signal、signalAll方法将此节点加入到同步队列之前还是之后的标志位 int interruptMode = 0; //循环判断当前新建的节点是否在同步队列中,直到此节点在同步队列中退出循环,否则就阻塞当前线程,isOnSyncQueue方法会在下面进行介绍 while (!isOnSyncQueue(node)) { //如果传入的超时时间小于等于0 if (nanosTimeout <= 0L) { //transferAfterCanceledWait方法,看下面的详细介绍 transferAfterCancelledWait(node); //退出循环 break; } //如果超时时间大于等于spinForTimeoutThreshold阈值,才会阻塞当前线程,否则让其线程自旋一段时间 if (nanosTimeout >= spinForTimeoutThreshold) //超时的阻塞当前线程,this是阻塞当前线程的监控对象 LockSupport.parkNanos(this, nanosTimeout); //如果当前线程产生中断请求,就跳出循环, checkInterruptWhileWaiting方法可以看上面对此方法的介绍 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //退出循环 break; //死亡时间减去当前时间,得到超时时间 nanosTimeout = deadline - System.nanoTime(); } //节点已经在同步队列中,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待,acquireQueued方法返回当前线程是否被中断,此方法会在下面详细进行介绍 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //将其标志节点interruptMode设置为重新中断标志位 interruptMode = REINTERRUPT; //如果当前节点的下一个节点不为空 if (node.nextWaiter != null) //将已取消的节点从条件队列中移除,可以看上面对unlinkCancelledWaiters方法介绍 unlinkCancelledWaiters(); //是否要抛出中断异常,或者发出中断请求 if (interruptMode != 0) //可以看上面对reportInterruptAfterWait方法的介绍 reportInterruptAfterWait(interruptMode); //返回死亡时间减去当前时间的时间戳 return deadline - System.nanoTime(); } //@param deadline 死亡时间 //@return 节点对应的线程被中断是否比占有独占锁的线程调用signal、signalAll方法将当前节点从条件队列移到同步队列中之前,如果是返回值为false,否则返回true,表示正常超时,而不是由中断退出 //新建个节点加入到同步队列中,并释放锁,让当前线程阻塞等待,直到被调用signal、signalAll方法,或者到死亡时间将当前节点从条件队列中移到同步队列中,在同步队列中阻塞得到获取锁。如果线程等待期间发出中断请求,则抛出中断异常 public final boolean awaitUntil(Date deadline) throws InterruptedException { //死亡时间的时间戳 long abstime = deadline.getTime(); //如果线程已被中断 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //为当前线程创建新的Node节点,并且将此节点加入到条件队列中,addConditionWaiter方法可以看上面对此方法的介绍 Node node = addConditionWaiter(); //释放当前线程占有的锁,并唤醒其他线程,fullRelease方法会在下面详细介绍到 int savedState = fullyRelease(node); //是否超时的标记位 boolean timedout = false; //线程中断请求是在signal、signalAll方法将此节点加入到同步队列之前还是之后的标志位 int interruptMode = 0; //循环判断当前新建的节点是否在同步队列中,直到此节点在同步队列中退出循环,否则就阻塞当前线程,isOnSyncQueue方法会在下面进行介绍 while (!isOnSyncQueue(node)) { //如果当前时间大于死亡时间 if (System.currentTimeMillis() > abstime) { //transferAfterCanceledWait方法,看下面的详细介绍 timedout = transferAfterCancelledWait(node); //退出循环 break; } //阻塞当前线程直到死亡时间、或者拥有独占锁的线程调用signal、signalAll方法唤醒线程 LockSupport.parkUntil(this, abstime); //如果当前线程产生中断请求,就跳出循环, checkInterruptWhileWaiting方法可以看上面对此方法的介绍 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //退出循环 break; } //节点已经在同步队列中,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待,acquireQueued方法返回当前线程是否被中断,此方法会在下面详细进行介绍 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //将其标志节点interruptMode设置为重新中断标志位 interruptMode = REINTERRUPT; //如果当前节点的下一个节点不为空 if (node.nextWaiter != null) //将已取消的节点从条件队列中移除可以看上面对unlinkCancelledWaiters方法介绍 unlinkCancelledWaiters(); //是否要抛出中断异常,或者发出中断请求 if (interruptMode != 0) //可以看上面对reportInterruptAfterWait方法的介绍 reportInterruptAfterWait(interruptMode); //返回表示此方法是正常超时结束,还是由中断退出结束 return !timedout; } //相比awaitNanos方法超时时间更加可控,可以传入不同的超时单位 //@return 节点对应的线程被中断是否比占有独占锁的线程调用signal、signalAll方法将当前节点从条件队列移到同步队列中之前,如果是返回值为false,否则返回true,表示正常超时,而不是由中断退出 //新建个节点加入到同步队列中,并释放锁,让当前线程阻塞等待,直到被调用signal、signalAll方法,或者超时将当前节点从条件队列中移到同步队列中,在同步队列中阻塞得到获取锁。如果线程等待期间发出中断请求,则抛出中断异常 public final boolean await(long time, TimeUnit unit) throws InterruptedException { //将超时时间转化为纳秒 long nanosTimeout = unit.toNanos(time); //如果线程已被中断 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //为当前线程创建新的Node节点,并且将此节点加入到条件队列中,addConditionWaiter方法可以看上面对此方法的介绍 Node node = addConditionWaiter(); //释放当前线程占有的锁,并唤醒其他线程,fullRelease方法会在下面详细介绍到 int savedState = fullyRelease(node); //获取死亡时间,即当前时间加上传入的超时时间 final long deadline = System.nanoTime() + nanosTimeout; //当前await方法是否正常超时结束,还是由中断退出结束 boolean timedout = false; //线程中断请求是在signal、signalAll方法将此节点加入到同步队列之前还是之后的标志位 int interruptMode = 0; //循环判断当前新建的节点是否在同步队列中,直到此节点在同步队列中退出循环,否则就阻塞当前线程,isOnSyncQueue方法会在下面进行介绍 while (!isOnSyncQueue(node)) { //如果传入的超时时间小于等于0 if (nanosTimeout <= 0L) { //transferAfterCanceledWait方法,看下面的详细介绍 timedout = transferAfterCancelledWait(node); //退出循环 break; } //如果超时时间大于等于spinForTimeoutThreshold阈值,才会阻塞当前线程,否则让其线程自旋一段时间 if (nanosTimeout >= spinForTimeoutThreshold) //超时的阻塞当前线程,this是阻塞当前线程的监控对象 LockSupport.parkNanos(this, nanosTimeout); //如果当前线程产生中断请求,就跳出循环, checkInterruptWhileWaiting方法可以看上面对此方法的介绍 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //退出循环 break; //返回死亡时间减去当前时间的时间戳 nanosTimeout = deadline - System.nanoTime(); } //节点已经在同步队列中,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待,acquireQueued方法返回当前线程是否被中断,此方法会在下面详细进行介绍 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //将其标志节点interruptMode设置为重新中断标志位 interruptMode = REINTERRUPT; //如果当前节点的下一个节点不为空 if (node.nextWaiter != null) //将已取消的节点从条件队列中移除可以看上面对unlinkCancelledWaiters方法介绍 unlinkCancelledWaiters(); //是否要抛出中断异常,或者发出中断请求 if (interruptMode != 0) //可以看上面对reportInterruptAfterWait方法的介绍 reportInterruptAfterWait(interruptMode); //返回表示此方法是正常超时结束,还是由中断退出结束 return !timedout; } //判断Condition实例的外部类AQS和传入进来的AQS是否相等,判断Condition实例是否归属于传入进来的AQS final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { // AbstractQueuedSynchronizer.this在内部类实例中经常使用,获取外部类的实例 return sync == AbstractQueuedSynchronizer.this; } //判断条件队列中是否有等待的节点 protected final boolean hasWaiters() { /调用hasWaiters方法的线程,不是拥有排他锁的线程就会IllegalMonitorStateException异常,Condition的使用只支持在排他锁下使用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //从条件队列的头节点开始,循环的获取条件队列中的节点 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { //判断条件队列中的是否有节点的状态值为CONDITION if (w.waitStatus == Node.CONDITION) //如果是直接返回true return true; } //否则直接返回false return false; } //获取条件队列中有多少个等待节点(状态值为CONDITION) protected final int getWaitQueueLength() { //调用getWaitQueueLength方法的线程,不是拥有排他锁的线程就会IllegalMonitorStateException异常,Condition的使用只支持在排他锁下使用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //等待节点数目 int n = 0; //从条件队列的头节点开始,循环的获取条件队列中的节点 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { //如果条件队列中等待节点的状态值为CONDITION if (w.waitStatus == Node.CONDITION) //等待节点加1 ++n; } //返回等待节点 return n; } //获取条件队列中所有等待节点(状态值为CONDITION)的线程,返回线程集合 protected final Collection<Thread> getWaitingThreads() { //调用getWaitingThreads方法的线程,不是拥有排他锁的线程就会IllegalMonitorStateException异常,Condition的使用只支持在排他锁下使用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //所有等待节点对应的线程集合 ArrayList<Thread> list = new ArrayList<Thread>(); //从条件队列的头节点开始,循环的获取条件队列中的节点 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { //如果条件队列的等待节点的状态值为CONDITION if (w.waitStatus == Node.CONDITION) { //获取等待节点对应的线程 Thread t = w.thread; //如果节点对应的线程不为空 if (t != null) //将其线程加入到集合中 list.add(t); } } //返回所有等待节点对应的线程集合 return list; } }
Пять, очередь синхронизации
- Настройки заголовка синхронной очереди
//通过UnSafe的cas函数设置AQS的属性head值,仅仅在enq方法中,返回设置释放成功,使用cas保证多线程安全 private final boolean compareAndSetHead(Node update) { //cas通过AQS实例和其属性head偏移量,如果头结点为空,将其空设置为update值 return unsafe.compareAndSwapObject(this, headOffset, null, update); } // 重新设置同步队列头结点head,只在acquire系列的方法中调用 private void setHead(Node node) { head = node; //当前节点对应的当前线程已经获取到锁了,将当前节点对应的线程置为空 node.thread = null; //当前节点前一个节点已经没有意义,将当前节点对应的前置节点置为空 node.prev = null; }
- Настройки хвоста синхронной очереди
//通过CAS函数设置tail值,仅仅在enq方法中调用,通过CAS函数设置tail值,保证多线程安全 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // 向同步队列尾插入新节点,如果队列没有初始化,先初始化同步队列的头尾节点。返回原先的同步队列尾节点 private Node enq(final Node node) { for (;;) { //获取同步队列的尾节点 Node t = tail; //t尾节点为空,表示队列为空,先初始化队列 if (t == null) { // 采用CAS函数即原子操作方式,设置队列头head值。 // 如果成功,再将head值赋值给链表尾tail。如果失败,表示head值已经被其他线程,那么就进入循环下一次 if (compareAndSetHead(new Node())) tail = head; } else { // 新添加的node节点的前一个节点prev指向原来的队列尾tail node.prev = t; // 采用CAS函数即原子操作方式,设置新队列尾tail值。 if (compareAndSetTail(t, node)) { // 设置老的队列尾tail的下一个节点next指向新添加的节点node t.next = node; return t; } } } }
6. Эксклюзивный замок
- Приобретение эксклюзивного замка
//获取独占锁。如果没有马上获取到,线程就会阻塞等待,直到获取到独占锁。不会响应中断异常 public final void acquire(int arg) { //1. 当前线程先调用tryAcquire方法,尝试获取独占锁,如果返回true,表示获取到锁,不需要执行acquireQueued方法。 //2. 如果tryAcquire方法获取独占锁失败,调用acquireQueued方法,先调用addWaiter方法为当前线程创建一个状态为独占模式的节点Node,并使用cas将其做为同步队列尾节点加入队列中, //如果当前加入节点的前置节点为头节点,当前线程调用tryAcquire方法去尝试获取锁,如果不成功,就会根据前置节点的状态来判断是否让当前线程阻塞,当前线程阻塞被唤醒。 //acquireQueued方法返回值表示在线程等待过程中,是否有另一个线程调用该线程的interrupt方法,发起中断请求。 //tryAcquire是个模板方法,可以看下面对此方法的介绍,acquireQueued方法可以看下面对此方法的介绍 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //当前线程执行acquireQueued方法的过程中有其他线程调用该线程的interrupt方法,发起中断请求,重新调用当前线程的interrupt方法,中断请求 selfInterrupt(); } // 尝试去获取独占锁,立即返回。如果返回true表示获取独占锁成功,此方法为模板方法,需要子类进行重写,可以看下面ReentrantReadWriteLock类中对AQS的tryAcquire方法实现 protected boolean tryAcquire(int arg) { //抛出UnsupportedOperationException异常 throw new UnsupportedOperationException(); } //ReentrantReadWriteLock类中对AQS的tryAcquire方法实现,详细的介绍,会在分析ReentrantReadWriteLock源码时进行详细介绍 //尝试获取同步队列的独占锁,与非公平锁最大的不同就是调用hasQueuedPredecessors()方法,hasQueuedPredecessors方法返回true,表示等待线程队列中有一个线程在当前线程之前,根据公平锁的规则,当前线程不能获取锁。 protected final boolean tryAcquire(int acquires) { //获取当前线程 Thread current = Thread.currentThread(); //获取当前锁的状态,即AQS中的属性值 int c = getState(); //由状态获取独占锁的个数,锁的状态也是根据独占锁的个数或者共享锁的个数来进行判断 int w = exclusiveCount(c); //如果c==0表示当前锁是空闲的,如果锁状态不等于0,表明锁状态有可能是读锁或者写锁 if (c != 0) { //如果独占锁的个数等于0,表明当前锁属于共享锁模式,直接返回失败,如果独占锁的个数不等于0,并且当前拥有独占锁的线程不是当前,也是直接返回失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; //如果独占锁的个数加上传入进来的请求独占锁个数大于MAX_COUNT,直接抛出错误,超过最大的独占锁个数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //否则的话更新锁的状态 setState(c + acquires); //返回获取独占锁成功 return true; } //非公平锁writerShouldBlock直接返回false,公平锁writeShouldBlock会调用hasQueuedPredecessors方法 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //返回获取独占锁失败 return false; //将当前线程设置为拥有独占锁的线程 setExclusiveOwnerThread(current); //返回获取独占锁成功 return true; } /** * 获取独占锁的acquire系列方法,都会使用这个方法来获取独占锁 * 循环通过tryAcquire方法不断去获取锁,如果没有获取成功,如果此节点的前置节点的状态值为SIGNAL * 需调用parkAndCheckInterrupt方法,让当前线程阻塞 * @param node 想要获取独占锁锁的节点 * @param arg 获取独占锁的个数 * @return 返回true,表示在线程等待的过程中,线程被中断了 */ final boolean acquireQueued(final Node node, int arg) { //设置失败的标志位 boolean failed = true; try { //表示节点对应的线程在等待过程中,是否被中断的标志位 boolean interrupted = false; //通过死循环,直到node节点对应的线程获取到锁,才返回 for (;;) { //获取当前节点的前置节点 final Node p = node.predecessor(); //如果当前节点的前一个节点为同步队列的头节点head,当前线程尝试使用tryAcquire方法获取锁,如果获取锁成功,那么当前线程就不需要阻塞等待,继续执行 if (p == head && tryAcquire(arg)) { //将节点node设置为同步队列的头节点 setHead(node); //将节点node的前置节点设置为空,加快gc p.next = null; // help GC //失败的标志位设置为false failed = false; //返回是否中断的标志位 return interrupted; } //判断当前节点的前置节点的状态值是否为SIGNAL,如果是调用parkAndCheckInterrupt方法阻塞当前线程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法会在下面详细进行介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果方法parkAndCheckInterrupt的返回值为true,表明节点对应的线程在等待时,有其他线程调用此线程的interrupt方法,中断请求 interrupted = true; } } finally { //在获取独占锁的过程中失败,将当前节点从同步队列中移除 if (failed) //cancelAcquire方法会在下面详细进行介绍 cancelAcquire(node); } } /** * 根据前一个节点pred的状态,来判断当前节点对应的线程是否应该被阻塞 * @param pred : node节点的前一个节点 * @param node : 要获取独占锁的节点 * @return 返回true 表示当前线程应该被阻塞,然后调用parkAndCheckInterrupt方法来阻塞当前线程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的状态值 int ws = pred.waitStatus; //如果前置节点的状态值为SIGNAL if (ws == Node.SIGNAL) // 如果前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞 return true; //如果前置节点已经取消,循环获取不是取消的前置节点 if (ws > 0) { // 如果前一个节点状态是Node.CANCELLED(大于0就是CANCELLED), // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。 // 所以从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev, // 表示node节点的前一个节点已经改变。 do { //重新赋值当前节点的前置节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //不是取消的前置节点的下一节点重新赋值为当前节点 pred.next = node; } else { // 此时前一个节点pred的状态只能是0或者PROPAGATE,不可能是CONDITION状态 // CONDITION(只在condition条件队列中节点存在,CLH同步队列中没有此状态的节点) // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回要获取独占锁的节点对应的线程不会阻塞 return false; } private final boolean parkAndCheckInterrupt() { //阻塞当前线程 LockSupport.park(this); //当前线程被唤醒后,返回当前线程的中断标志位 return Thread.interrupted(); } //将传入进来的节点从同步队列中移除,将传入节点对应的线程置为空,状态置为CANCELLED private void cancelAcquire(Node node) { //如果当前节点为空,直接跳过 if (node == null) return; //将当前节点对应的线程置为空 node.thread = null; //获取当前要取消节点的前置节点 Node pred = node.prev; //循环跳过前置节点状态为CANNELLED的值 while (pred.waitStatus > 0) node.prev = pred = pred.prev; //获取状态不是取消的前置的节点的下一个节点,在设置前置节点的下一个节点使用到 Node predNext = pred.next; //将当前要取消的节点状态赋值为CANCELLED node.waitStatus = Node.CANCELLED; //如果要取消节点为尾节点,将尾节点设置为要取消节点的前一个节点 if (node == tail && compareAndSetTail(node, pred)) { //如果设置成功,将要取消节点的前置节点的下一个节点设置为空 compareAndSetNext(pred, predNext, null); } else { int ws; //如果前置不是头节点,并且前置节点的状态值为SIGNAL,或者 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //获取要取消节点的下一个节点 Node next = node.next; //要取消节点的下一个节点不为空,并且状态值小于等于0,即不是取消状态值CANCELLED if (next != null && next.waitStatus <= 0) //将要取消节点的前置节点和要取消节点的下一节点连接起来 compareAndSetNext(pred, predNext, next); } else { //否则的话唤醒node节点的下一个节点,在下面会对unparkSuccessor方法进行详细介绍 unparkSuccessor(node); } //将要取消节点的下一节点设置为自身,加快gc node.next = node; // help GC } } //唤醒传入节点的下一个不为空、状态值不是已取消CANCELLED节点对应的线程 private void unparkSuccessor(Node node) { //获取传入节点的状态 int ws = node.waitStatus; //如果状态小于0,将其状态设置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取传入节点的下一节点 Node s = node.next; //如果下一节点为空或者是已取消节点 if (s == null || s.waitStatus > 0) { //将下一节点置为空 s = null; //从同步队列的尾节点开始,获取不等于空,并且节点不等于传入节点的前置节点,即重新寻找传入节点的下一个不为空,并且没有取消的节点进行唤醒 for (Node t = tail; t != null && t != node; t = t.prev) //t节点不是已取消节点 if (t.waitStatus <= 0) //将下一节点s赋值为t s = t; } //如果下一节点s不为空 if (s != null) //唤醒此节点s对应的线程 LockSupport.unpark(s.thread); }
- снятие эксклюзивной блокировки
//调用release方法的线程,是拥有独占锁的线程,释放独占锁的操作 public final boolean release(int arg) { //线程首先调用tryRelease方法,尝试去释放锁,此方法为模板方法,由子类具体实现 if (tryRelease(arg)) { //如果tryRelease释放锁成功,获取同步队列的头结点 Node h = head; //如果头结点不为空,并且状态值不等于0,不管头结点是已取消,还是其他状态 if (h != null && h.waitStatus != 0) //唤醒头结点的下一个不为空,并且状态值不为CANCELLED的下一个节点,可以看上面对unparkSuccessor方法的介绍 unparkSuccessor(h); //释放独占锁成 return true; } //释放独占锁失败 return false; } //AQS的模板方法,由具体的子类实现 protected boolean tryRelease(int arg) { //抛出UnsupportedOperationException异常 throw new UnsupportedOperationException(); } //ReentrantReadWriteLock中Sync的tryRelease方法实现 protected final boolean tryRelease(int releases) { //判断当前尝试释放锁的线程,是否是拥有独占锁的线程,如果不是抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //将其锁的状态值减去要释放的独占锁的个数 int nextc = getState() - releases; //如果nextc为0,表示独占锁是否成功 boolean free = exclusiveCount(nextc) == 0; if (free) //如果锁释放成功,将其记录拥有独占锁的线程的记录置为空 setExclusiveOwnerThread(null); //设置锁的状态 setState(nextc); //返回独占锁释放是否成功 return free; }
7. Общий замок
- Получение общего замка
//获取共享锁,不响应中断,不会抛出中断异常 public final void acquireShared(int arg) { //当前线程调用tryAcquireShared尝试去获取共享锁,如果返回值小于0表示获取共享锁失败 if (tryAcquireShared(arg) < 0) // 调用doAcquireShared方法去获取共享锁,tryAcquireShared、doAcquireShared方法会在下面详细进行介绍 doAcquireShared(arg); } //尝试获取共享锁,AQS的模板方法,由具体的子类进行实现 protected int tryAcquireShared(int arg) { //抛出UnsupportedOperationException异常 throw new UnsupportedOperationException(); } //ReentrantReadWriteLock类对tryAcquireShared方法的实现,会在ReentrantReadWriteLock源码的分析中进行详细的介绍 protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取锁的状态 int c = getState(); //exclusiveCount根据锁的状态获取独占锁的个数如果不等于0,并且拥有此独占锁的线程不是当前线程,直接返回-1失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //根据锁的状态获取共享锁的个数 int r = sharedCount(c); //获取共享锁,当前线程是否需要阻塞,readerShouldBlock方法分为公平锁和非公平锁的不同实现,公平锁会调用hasQueuedPredecessors方法,判断同步队列中前面是否有节点,有阻塞,非公平锁会 //调用apparentlyFirstQueuedIsExclusive方法,判断头节点的状态是否是独占锁模式,如果是阻塞当前线程,否则继续执行。并且共享锁的个数得小于MAX_COUNT if (!readerShouldBlock() && r < MAX_COUNT && //使用cas将当前锁的状态加上65535,即高16位存放共享锁的个数 compareAndSetState(c, c + SHARED_UNIT)) { //如果原先共享锁的个数为0 if (r == 0) { //将当前线程赋值给第一个获取共享锁的线程firstReader firstReader = current; //当前线程持有的共享锁个数 firstReaderHoldCount = 1; } else if (firstReader == current) {//锁的状态共享锁原先就有被获取,并且第一个持有的线程是当前线程,firstReaderHoldCount++操作,持有共享锁个数加1操作 //第一个获取共享锁的线程持有的共享锁个数加1操作 firstReaderHoldCount++; } else { //获取上一次缓存的线程的HoldCounter,HoldCounter实例是存放一个线程已获取的共享锁个数,及线程id HoldCounter rh = cachedHoldCounter; //上一次缓存的HoldCounter实例为空,或者HoldCounter实例里的线程id和当期线程id不相等 if (rh == null || rh.tid != getThreadId(current)) //重新获取当前线程对应的HoldCounter实例 cachedHoldCounter = rh = readHolds.get(); //如果当前线程对应的HoldCounter实例的线程获取共享锁的个数为0,重新将rh设置到ThreadLocal,ThreadLocal不清楚的可以看下我的另一篇对此的源码分析 else if (rh.count == 0) readHolds.set(rh); //否则线程对应的HoldCounter实例的线程已获取的共享锁个数进行加1操作 rh.count++; } //返回1,表示获取共享锁成功 return 1; } //fullTryAcquireShared可以看下面对此方法的介绍 return fullTryAcquireShared(current); } //ReentrantReadWriteLock类的fullTryAcquireShared方法,详细的会再ReentrantReadWriteLock源码分析进行介绍 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //死循环的获取共享锁 for (;;) { //获取锁的状态 int c = getState(); //如果当前锁状态是独占锁,并且拥有独占锁的线程不是当前线程直接返回-1失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //当前线程是否需要阻塞,readerShouldBlock方法分为公平锁和非公平锁的不同实现,公平锁会调用hasQueuedPredecessors方法,判断同步队列中前面是否有节点,有阻塞,非公平锁会调用apparentlyFirstQueuedIsExclusive方法,判断头节点的状态是否是独占锁模式,如果是阻塞当前线程 } else if (readerShouldBlock()) { //如果当前线程就是第一个获取共享锁的线程,不做任何操作 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { //获取上一次缓存的HoldCounter实例 rh = cachedHoldCounter; //HoldCounter实例为空,并且HoldCounter实例不属于当前线程 if (rh == null || rh.tid != getThreadId(current)) { //获取当前线程对应的HoldCounter实例 rh = readHolds.get(); //如果当前线程获取共享锁个数为0,将HoldCounter实例从ThreadLocal中移除 if (rh.count == 0) readHolds.remove(); } } //如果当前线程对应的HoldCounter实例的属性count等于0,即当前线程没有获取到共享锁,count当前线程获取的共享锁个数 if (rh.count == 0) //返回-1失败 return -1; } } //当前共享锁的个数大于MAX_COUNT,直接抛出一个错误 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //共享锁是使用状态的高16位进行存储,为此加共享锁加65535 if (compareAndSetState(c, c + SHARED_UNIT)) { //如果共享锁的个数为0,即没有获取过共享锁 if (sharedCount(c) == 0) { //将第一个获取共享锁的标识属性firstReader赋值为当前线程 firstReader = current; //获取共享锁的个数为1 firstReaderHoldCount = 1; //锁的状态共享锁原先就有被获取,并且第一个持有的线程是当前线程,firstReaderHoldCount++操作,持有共享锁个数加1操作 } else if (firstReader == current) { firstReaderHoldCount++; } else { //如果HoldCounter实例为空 if (rh == null) //从缓存中获取上一次的HoldCounter实例 rh = cachedHoldCounter; //HoldCounter实例为空,并且不属于当前线程 if (rh == null || rh.tid != getThreadId(current)) //从ThreadLocal中重新获取HoldCounter实例 rh = readHolds.get(); //如果当前线程对应的HoldCounter实例的线程获取共享锁的个数为0,重新将rh设置到ThreadLocal,ThreadLocal不清楚的可以看下我的另一篇对此的源码分析 else if (rh.count == 0) readHolds.set(rh); //否则线程对应的HoldCounter实例的线程已获取的共享锁个数进行加1操作 rh.count++; //将缓存赋值为当前线程的HoldCounter cachedHoldCounter = rh; // cache for release } //返回1表示获取共享锁成功 return 1; } } } //死循环获取共享锁,根据当前节点的前一个节点状态来判断当前线程是否应该阻塞,直到获取到共享锁 private void doAcquireShared(int arg) { //往同步队列中从尾节点加入一个模式为共享锁的节点,看上面对addWaiter方法的介绍 final Node node = addWaiter(Node.SHARED); //获取共享锁是否成功 boolean failed = true; try { //当前线程在获取共享锁的过程中是否被中断的标志位 boolean interrupted = false; for (;;) { //获取当前节点的前一个节点 final Node p = node.predecessor(); //如果当前节点的前置节点为表头 if (p == head) { //使用上面介绍的tryAcquireShared尝试获取共享锁 int r = tryAcquireShared(arg); // if (r >= 0) { //可以看下面对setHeadAndPropagate方法的介绍 setHeadAndPropagate(node, r); //将当前节点的前置节点下一个节点置为空 p.next = null; // help GC //如果发生中断请求 if (interrupted) selfInterrupt(); //获取共享锁成功 failed = false; //直接返回 return; } } //判断当前节点的前置节点的状态值是否为SIGNAL,如果是调用parkAndCheckInterrupt方法阻塞当前线程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法可以看上面对这两个方法的详细介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //parkAndCheckInterrupt返回值是否有中断请求 interrupted = true; } } finally { //如果失败 if (failed) //将当前节点从同步队列中移除,可以看上面对cancelAcquire方法的介绍 cancelAcquire(node); } } // 重新设置CLH队列头,如果CLH队列头的下一个节点为null或者共享模式,那么就要唤醒共享锁上等待的线程 private void setHeadAndPropagate(Node node, int propagate) { //获取同步队列的头节点 Node h = head; // Record old head for check below //将传入进来的节点设置为同步队列的表头,将传入进来的前置节点和线程都置为空 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取新的CLH队列头的下一个节点s Node s = node.next; // 如果节点s是空或者共享模式节点,那么就要唤醒共享锁上等待的线程 if (s == null || s.isShared()) //在下面共享锁的释放会对此方法进行介绍 doReleaseShared(); } } //如果当前线程在获取共享锁的过程中,有其他线程调用当前线程的interrupt方法,中断请求,会抛出中断异常 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //如果当前线程被其他线程发起中断请求 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //tryAcquireShared可以看上面对此方法的介绍 if (tryAcquireShared(arg) < 0) //可以看下面对doAcquireSharedInterruptibly的介绍 doAcquireSharedInterruptibly(arg); } //超时的获取共享锁,并且响应中断请求,抛出中断异常 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //如果当前线程被其他线程发起中断请求 if (Thread.interrupted()) //抛出中断异常 throw new InterruptedException(); //tryAcquireShared可以看上面对此方法的介绍 return tryAcquireShared(arg) >= 0 || //可以看下面对doAcquireSharedNanos的介绍 doAcquireSharedNanos(arg, nanosTimeout); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //往同步队列中从尾节点加入一个模式为共享锁的节点,看上面对addWaiter方法的介绍 final Node node = addWaiter(Node.SHARED); //执行是否失败的标志位,如果失败会将此节点从同步队列中移除 boolean failed = true; try { //死循环的获取共享锁,除非获取共享锁成功,获取线程在阻塞的过程中,被中断,抛出中断异常 for (;;) { //获取当前节点的前一个节点 final Node p = node.predecessor(); //如果当前节点的前置节点为表头 if (p == head) { //使用上面介绍的tryAcquireShared尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { //可以看上面对setHeadAndPropagate方法的介绍 setHeadAndPropagate(node, r); //将当前节点的前置节点下一个节点置为空 p.next = null; // help GC //获取共享锁成功 failed = false; //直接返回 return; } } //判断当前节点的前置节点的状态值是否为SIGNAL,如果是调用parkAndCheckInterrupt方法阻塞当前线程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法可以看上面对这两个方法的详细介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果线程在阻塞的过程中,有其他线程调用当前线程的interrupt方法,中断请求,抛出中断异常 throw new InterruptedException(); } } finally { //如果失败 if (failed) //将当前节点从同步队列中移除,可以看上面对cancelAcquire方法的介绍 cancelAcquire(node); } } //支持中断,会抛出中断异常,返回值是否获取共享锁成功 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //如果传入进来的超时时间小于等于0直接返回失败 if (nanosTimeout <= 0L) return false; //死亡时间,当前时间加上超市时间 final long deadline = System.nanoTime() + nanosTimeout; //往同步队列中从尾节点加入一个模式为共享锁的节点,看上面对addWaiter方法的介绍 final Node node = addWaiter(Node.SHARED); //执行是否失败的标志位,如果失败会将此节点从同步队列中移除 boolean failed = true; try { for (;;) { //获取当前节点的前一个节点 final Node p = node.predecessor(); //如果当前节点的前置节点为表头 if (p == head) { //使用上面介绍的tryAcquireShared尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { //可以看上面对setHeadAndPropagate方法的介绍 setHeadAndPropagate(node, r); //将当前节点的前置节点下一个节点置为空 p.next = null; // help GC //获取共享锁成功 failed = false; //直接返回 return; } } //获取剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); //如果在超时时间还是没有获取到共享锁,直接返回失败 if (nanosTimeout <= 0L) return false; //判断当前节点的前置节点的状态值是否为SIGNAL,如果是调用parkAndCheckInterrupt方法阻塞当前线程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法可以看上面对这两个方法的详细介绍 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //并且超时时间大于一定的阈值否则让其自旋一段时间 //使用LockSupport阻塞当前线程 LockSupport.parkNanos(this, nanosTimeout); //如果当前线程在阻塞过程中被中断 if (Thread.interrupted()) //直接抛出中断异常 throw new InterruptedException(); } } finally { //如果失败 if (failed) //将当前节点从同步队列中移除,可以看上面对cancelAcquire方法的介绍 cancelAcquire(node); } }
- Снятие общей блокировки
//释放共享锁 public final boolean releaseShared(int arg) { //尝试释放共享锁 if (tryReleaseShared(arg)) { //唤醒等待共享锁的线程 doReleaseShared(); return true; } return false; } //尝试去释放共享锁,AQS的模板方法,由不同的子类进行实现 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //ReentrantReadWriteLock类的tryReleaseShared方法实现 protected final boolean tryReleaseShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //如果当前线程是第一个获取共享锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; //第一个获取共享锁的线程持有的共享锁数等于1,将firstReader置为空 if (firstReaderHoldCount == 1) firstReader = null; else //第一个获取共享锁的线程持有的共享锁数做减1操作 firstReaderHoldCount--; } else { //获取上一次缓存的HoldCounter实例 HoldCounter rh = cachedHoldCounter; //HoldCounter实例rh为空,或者rh不属于当前线程 if (rh == null || rh.tid != getThreadId(current)) //重新获取当前线程的HoldCounter实例 rh = readHolds.get(); //如果当前线程持有的共享锁个数 int count = rh.count; if (count <= 1) { //从ThreadLocal中移除当前线程的HoldCounter实例 readHolds.remove(); //count <= 0表示当前线程就没有获取到读锁(共享锁),这里释放就抛出异常 if (count <= 0) throw unmatchedUnlockException(); } //当前线程的持有共享锁个数做减1操作 --rh.count; } //死循环的释放锁,如果当前锁的共享锁个数为0,返回true,表明共享锁释放成功 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } //唤醒等待共享锁的线程 private void doReleaseShared() { for (;;) { // 将同步队列头赋值给节点h Node h = head; // 如果节点h不为null,且不等于同步队列尾 if (h != null && h != tail) { // 得到节点h的状态 int ws = h.waitStatus; //如果状态是Node.SIGNAL,就要唤醒节点h后继节点的线程 if (ws == Node.SIGNAL) { // 将节点h的状态设置成0,如果设置失败,就继续循环,再试一次。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒节点h后继节点的线程 unparkSuccessor(h); } // 如果节点h的状态是0,就设置ws的状态是PROPAGATE。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果同步队列头head节点发生改变,继续循环, // 如果没有改变,就跳出循环 if (h == head) break; } }