Сегодня мы будем учиться и учитьсяAbstractQueuedSynchronizer
связанные принципы классов,java.util.concurrent
Многие классы в пакете зависят от синхронизатора очередей, предоставляемого этим классом, например, обычно используемыйReentranLock
,Semaphore
иCountDownLatch
Ждать.
Для облегчения понимания мы используем абзацReentranLock
Пример кода, объяснитеReentranLock
в каждом методеAQS
использование.
Пример ReentranLock
мы все знаемReentranLock
поведение блокировки иSynchronized
Похоже, все они являются реентерабельными блокировками, но реализация этих двух действительно совершенно различна, мы объясним это позже.Synchronized
принцип.Кроме того, синхронизированную блокировку нельзя прервать, в то время как ReentrantLock обеспечивает прерываемую блокировку.. Код нижеReentranLock
функции, мы объясним принципы реализации этих функций в этом порядке.
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
Честные и нечестные замки
ReentranLock
Разделенные на справедливые блокировки и нечестные блокировки, разница между ними заключается в том, связана ли возможность получения блокировки с порядком постановки в очередь. Все мы знаем, что если блокировка удерживается другим потоком, другие потоки, подающие заявку на блокировку, будут приостановлены в ожидании и присоединятся к очереди ожидания. По идее сначала позвонитеlock
Функция приостановлена, ожидающие потоки должны быть в начале очереди, если они вошли в очередь после вызова. Если в это время блокировка снимается, необходимо уведомить ожидающий поток, чтобы попытаться снова получить блокировку, блокировка сделает справедливым первого, кто войдет в очередь, чтобы получить блокировку потока. Вместо того, чтобы справедливые блокировки разбудили все потоки, пусть они попытаются снова получить блокировку, это может привести к тому, что следующий поток получит блокировку, тогда это несправедливо.
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Мы найдемFairSync
иNonfairSync
унаследовалиSync
класс, покаSync
Родительский классAbstractQueuedSynchronizer
(именуемый в дальнейшемAQS
). ноAQS
Конструктор пуст и не работает.
Последующий анализ исходного кода, если не указано иное, относится к честным блокировкам.
операция блокировки
ReentranLock
изlock
Функция выглядит следующим образом, напрямую вызываетсяsync
изlock
функция. то есть звонитьFairSync
изlock
функция.
//ReentranLock
public void lock() {
sync.lock();
}
//FairSync
final void lock() {
//调用了AQS的acquire函数,这是关键函数之一
acquire(1);
}
Мы официально начнем следующийAQS
Соответствующий исходный код анализируется,acquire
Функция функции состоит в том, чтобы получить количество, которое может быть получено только одним потоком за тот же период времени.Эта сумма является абстрактным понятием блокировки. Давайте сначала проанализируем код, и постепенно вы поймете, что он означает.
public final void acquire(int arg) {
// tryAcquire先尝试获取"锁",获取了就不进入后续流程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//addWaiter是给当前线程创建一个节点,并将其加入等待队列
//acquireQueued是当线程已经加入等待队列之后继续尝试获取锁.
selfInterrupt();
}
tryAcquire
,addWaiter
иacquireQueued
Все они очень важные функции.Давайте узнаем об этих функциях по очереди и поймем их функции.
//AQS类中的变量.
private volatile int state;
//这是FairSync的实现,AQS中未实现,子类按照自己的需要实现该函数
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS中的state变量,代表抽象概念的锁.
int c = getState();
if (c == 0) { //值为0,那么当前独占性变量还未被线程占有
//如果当前阻塞队列上没有先来的线程在等待,UnfairSync这里的实现就不一致
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//成功cas,那么代表当前线程获得该变量的所有权,也就是说成功获得锁
setExclusiveOwnerThread(current);
// setExclusiveOwnerThread将本线程设置为独占性变量所有者线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果该线程已经获取了独占性变量的所有权,那么根据重入性
//原理,将state值进行加1,表示多次lock
//由于已经获得锁,该段代码只会被一个线程同时执行,所以不需要
//进行任何并行处理
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//上述情况都不符合,说明获取锁失败
return false;
}
Из приведенного выше кода мы можем найти, чтоtryAcquire
это попытаться получить переменную, эксклюзивную для этого потокаstate
. Значение state указывает на его состояние: если оно равно 0, то ни один поток в настоящее время не имеет монопольного использования этой переменной; если это не так, это означает, что поток уже имеет монопольное использование этой переменной, а это означает, что поток уже получил замок. Но в это время необходимо сделать еще одно суждение, чтобы увидеть, получена ли блокировка самим текущим потоком, и если да, то увеличить значение состояния.
Здесь необходимо пояснить несколько моментов, в первую очередьcompareAndSetState
функция, которая задается с помощью операции CASstate
значение, и значение состояния устанавливаетсяvolatile
Модификатор через эти две точки, чтобы гарантировать, что изменение значения состояния не вызовет проблем с многопоточностью. Тогда есть разница между справедливыми замками и несправедливыми замками.UnfairSync
изnonfairTryAcquire
функция не вызывается в том же местеhasQueuedPredecessors
Чтобы определить, есть ли в данный момент поток в очереди на получение блокировки.
еслиtryAcquire
возвращениеtrue
, то блокировка получена успешно; если она возвращает false, то блокировка не была получена и ее необходимо добавить в очередь ожидания блокировки. Давайте посмотрим нижеaddWaiter
сопутствующие операции.
Блокирующая очередь, ожидающая блокировки
Связанная функция добавления узла, который сохраняет информацию о текущем потоке в очередь ожидания, включает связанный алгоритм очереди без блокировки.AQS
Добавляйте узлы только в хвост очереди, а используемый безблокировочный алгоритм относительно прост. Алгоритм настоящей lock-free очереди подождет, пока мы его проанализируемConcurrentSkippedListMap
пока объясняю.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//先使用快速入列法来尝试一下,如果失败,则进行更加完备的入列算法.
//只有在必要的情况下才会使用更加复杂耗时的算法,也就是乐观的态度
Node pred = tail; //列尾指针
if (pred != null) {
node.prev = pred; //步骤1:该节点的前趋指针指向tail
if (compareAndSetTail(pred, node)){ //步骤二:cas将尾指针指向该节点
pred.next = node;//步骤三:如果成果,让旧列尾节点的next指针指向该节点.
return node;
}
}
//cas失败,或在pred == null时调用enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //cas无锁算法的标准for循环,不停的尝试
Node t = tail;
if (t == null) { //初始化
if (compareAndSetHead(new Node()))
//需要注意的是head是一个哨兵的作用,并不代表某个要获取锁的线程节点
tail = head;
} else {
//和addWaiter中一致,不过有了外侧的无限循环,不停的尝试,自旋锁
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
ПозвонивaddWaiter
функция,AQS
Текущий поток добавлен в очередь ожидания, но выполнение текущего потока не заблокировано, далее проанализируем его.acquireQueued
функция.
Ожидание работы узла очереди
Поскольку операция, входящая в состояние блокировки, снизит эффективность выполнения, поэтомуAQS
Предпринимаются усилия, чтобы избежать блокировки потоков, пытающихся получить эксклюзивные переменные. Итак, когда поток присоединяется к очереди ожидания,acquireQueued
Будет выполнен цикл for, и каждый раз будет оцениваться, должен ли текущий узел получить эту переменную (в начало очереди). Если он не должен быть извлечен или терпит неудачу при следующей попытке извлечения, вызовитеshouldParkAfterFailedAcquire
Это решает, должен ли он войти в состояние блокировки. Если узел перед текущим узлом вошел в состояние блокировки, то можно определить, что текущий узел не может получить блокировку, чтобы предотвратить выполнение ЦП, цикл FOR, потребление ресурсов ЦП, вызовparkAndCheckInterrupt
функция входа в состояние блокировки.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //一直执行,直到获取锁,返回.
final Node p = node.predecessor();
//node的前驱是head,就说明,node是将要获取锁的下一个节点.
if (p == head && tryAcquire(arg)) { //所以再次尝试获取独占性变量
setHead(node); //如果成果,那么就将自己设置为head
p.next = null; // help GC
failed = false;
return interrupted;
//此时,还没有进入阻塞状态,所以直接返回false,表示不需要中断调用selfInterrupt函数
}
//判断是否要进入阻塞状态.如果`shouldParkAfterFailedAcquire`
//返回true,表示需要进入阻塞
//调用parkAndCheckInterrupt;否则表示还可以再次尝试获取锁,继续进行for循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//调用parkAndCheckInterrupt进行阻塞,然后返回是否为中断状态
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //前一个节点在等待独占性变量释放的通知,所以,当前节点可以阻塞
return true;
if (ws > 0) { //前一个节点处于取消获取独占性变量的状态,所以,可以跳过去
//返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将上一个节点的状态设置为signal,返回false,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //将AQS对象自己传入
return Thread.interrupted();
}
блокировка и прерывание
Из приведенного выше анализа мы знаемAQS
позвонивLockSupport
изpark
метод для выполнения операции, которая блокирует текущий процесс. Фактически блокировка здесь означает, что поток больше не выполняется, вызывая эту функцию, поток переходит в состояние блокировки.lock
Затем операция блокируется, ожидая прерывания или освобождения монопольной переменной.
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);//设置阻塞对象,用来记录线程被谁阻塞的,用于线程监控和分析工具来定位
UNSAFE.park(false, 0L);//让当前线程不再被线程调度,就是当前线程不再执行.
setBlocker(t, null);
}
Актуальные знания о прерывании, мы поговорим об этом позже и продолжим следитьAQS
Давайте посмотрим на операции, связанные с освобождением эксклюзивных переменных.
операция разблокировки
иlock
Операция аналогичная,unlock
действие называетсяAQS
изrelase
Методы, параметры и вызовacquire
время то же самое, это 1.
public final boolean release(int arg) {
if (tryRelease(arg)) {
//释放独占性变量,起始就是将status的值减1,因为acquire时是加1
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒head的后继节点
return true;
}
return false;
}
Как видно из приведенного выше кода, релиз вызывается первымtryRelease
чтобы освободить эксклюзивную переменную. В случае успеха посмотреть, есть ли заблокированные потоки, ожидающие блокировки, и если да, то вызватьunparkSuccessor
разбудить их.
protected final boolean tryRelease(int releases) {
//由于只有一个线程可以获得独占先变量,所以,所有操作不需要考虑多线程
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果等于0,那么说明锁应该被释放了,否则表示当前线程有多次lock操作.
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Мы видим, чтоtryRelease
Логика, заложенная в концепции реентерабельной блокировки, только доstate
Когда значение 0 равно 0, это означает, что замок действительно выделяется. Так что эксклюзивная переменнаяstate
Значение представляет наличие или отсутствие блокировки. когдаstate=0
Когда это означает, что блокировка не занята, если нет, то это означает, что текущая блокировка уже занята.
private void unparkSuccessor(Node node) {
.....
//一般来说,需要唤醒的线程就是head的下一个节点,但是如果它获取锁的操作被取消,或在节点为null时
//就直接继续往后遍历,找到第一个未取消的后继节点.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
называетсяunpark
метод, делайlock
Поток, операция которого заблокирована, восстанавливается в рабочее состояние и будет выполняться снова.acquireQueued
операции в бесконечном цикле for, попробуйте еще раз получить блокировку.
постскриптум
БеспокойствоAQS
иReentrantLock
Анализ почти закончен. Должен сказать, я был шокирован, когда впервые увидел реализацию AQS, я подумалSynchronized
иReentrantLock
Принцип реализации тот же, все полагаются на функцию виртуальной машины Java для достижения. я не думал, что естьAQS
Такой большой босс за этим помогает. После изучения принципа этого класса наш анализ многих классов JUC стал намного проще. также,AQS
ВовлеченныйCAS
Алгоритм работы и разблокированных очередей также обеспечивает основу для изучения других алгоритмов поиска.Океан знаний безграничен!