Некоторое время назад я написал статью об анализе исходного кода AQS.Сверхдетальный принципиальный анализ AbstractQueuedSynchronizer, в статье я сказалJUC
Большинство классов, связанных с многопоточностью в пакете, связаны сAQS
Связанные, сегодня мы научимся зависеть отAQS
очередь блокировкиBlockingQueue
принцип реализации. Исходный код в этой статье не указан и взят изArrayBlockingQueue
.
очередь блокировки
Я полагаю, что большинство учащихся поймут концепцию блокирующих очередей при изучении пулов потоков и запомнят влияние различных типов блокирующих очередей на инициализацию пулов потоков. При получении элементов из очереди блокировки, но очередь пуста, текущий поток блокируется до тех пор, пока другой поток не добавит элемент в очередь блокировки; аналогично, при добавлении элементов в очередь блокировки, если очередь заполнена, текущий поток также блокирует блоки пока другой поток не прочитает элемент из очереди. Очереди блокировки, как правило, работают по принципу «первым пришел – первым обслужен» и используются для реализации шаблонов производителя и потребителя. При возникновении двух вышеуказанных ситуаций у очереди блокировки есть четыре разных метода обработки: эти четыре метода выдают исключение, возвращают специальное значение (null или false), блокируют текущий поток до конца выполнения, а последний — Only заблокировать на фиксированное время и отказаться от операции, если она не может быть успешно выполнена по истечении этого времени. Эти методы сведены в таблицу ниже.
Мы просто анализируемput
а такжеtake
метод.
положить и взять функции
Все мы знаем, что модель производитель-потребитель может быть легко реализована с использованием синхронной очереди.На самом деле синхронная очередь реализована в соответствии с моделью производитель-потребитель.Мы можем использоватьput
Функция рассматривается как операция производителя,take
является операция потребителя.
Давайте сначала посмотримArrayListBlock
конструктор. он инициализированput
а такжеtake
Ключевыми переменными-членами, используемыми в функции, являютсяReentrantLock
а такжеCondition
.
public ArrayBlockingQueue(int capacity, boolean fair) {
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ReentrantLockAQS
подклассnewCondition
возвращается функциейCondition
Экземпляр интерфейса определяется внутри класса AQS.ConditionObject
Класс реализации. его можно вызвать напрямуюAQS
сопутствующие функции.
put
Функция добавит элементы в конец очереди.Если очередь заполнена и нельзя добавить элементы, она заблокируется и будет ждать, пока их можно будет добавить. Исходный код функции показан ниже.
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先获得锁
try {
while (count == items.length)
//如果队列满了,就NotFull这个Condition对象上进行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
//当putIndex达到最大时,就返回到起点,继续插入,
//当然,如果此时0位置的元素还没有被取走,
//下次put时,就会因为cout == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//因为插入了元素,通知等待notEmpty事件的线程。
notEmpty.signal();
}
Мы обнаружим, что функция put использует механизм ожидания/уведомления. В отличие от общей реализации производитель-потребитель, синхронные очереди используютReentrantLock
а такжеCondition
Комбинированный механизм сначала получения блокировки, а затем ожидания; вместоSynchronized
а такжеObject.wait
Механизмы. Разница здесь будет подробно объяснена в следующем разделе.
После прочтения сведений о производителеput
функция, давайте посмотрим, что потребитель называетtake
функция.take
Функция будет блокироваться, когда очередь пуста, пока в нее не будет добавлен новый элемент.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列为空,那么在notEmpty对象上等待,
//当put函数调用时,会调用notEmpty的notify进行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//如果到了尾部,将指针重新调整到头部
takeIndex = 0;
count--;
....
//通知notFull对象上等待的线程
notFull.signal();
return x;
}
ждать операции
Мы обнаруживаемArrayBlockingList
не использовалObject.wait
, Вместо того, чтобы использоватьCondition.await
,Почему это? Каковы причины этого?
Condition
объекты могут обеспечивать иObject
изwait
а такжеnotify
такое же поведение, но последнее должно быть приобретено в первую очередьsynchronized
Эту встроенную блокировку монитора можно вызвать толькоCondition
должен сначала получитьReentrantLock
. Эти два метода снимут соответствующую блокировку при блокировке и ожидании, ноCondition
Ожидание можно прервать, это единственная разница между ними.
Давайте сначала посмотримCondition
изwait
функция,wait
Поток функции примерно такой, как показано на рисунке ниже.
wait
Функция состоит из трех основных шагов. Один - позвонитьaddConditionWaiter
Функция, которая добавляет узел в очередь ожидания условия, представляющий текущий поток, ожидающий сообщения. тогда позвониfullyRelease
чтобы снять удерживаемую блокировку и вызвать функцию AQS.Студенты, которые не понимают, могут проверить статью, представленную в начале этой статьи. Наконец, продолжайте звонитьisOnSyncQueue
Функция определяет, передается ли узел вsync queue
В очереди, то есть очереди в AQS, ожидающей получения блокировки. Если нет, введите состояние блокировки, если уже в очереди, вызовитеacquireQueued
Функция повторно получает блокировку.
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait队列上添加新的节点
Node node = addConditionWaiter();
//释放当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//由于node在之前是添加到condition wait queue上的,现在判断这个node
//是否被添加到Sync的获得锁的等待队列上,Sync就是AQS的子类
//node在condition queue上说明还在等待事件的notify,
//notify函数会将condition queue 上的node转化到Sync的队列上。
while (!isOnSyncQueue(node)) {
//node还没有被添加到Sync Queue上,说明还在等待事件通知
//所以调用park函数来停止线程执行
LockSupport.park(this);
//判断是否被中断,线程从park函数返回有两种情况,一种是
//其他线程调用了unpark,另外一种是线程被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
//再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
 ....
}
final int fullyRelease(Node node) {
//AQS的方法,当前已经在锁中了,所以直接操作
boolean failed = true;
try {
int savedState = getState();
//获取state当前的值,然后保存,以待以后恢复
// release函数是AQS的函数,不清楚的同学请看开头介绍的文章。
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
private int checkInterruptWhileWaiting(Node node) {
//中断可能发生在两个阶段中,一是在等待signa时,另外一个是在获得signal之后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
//两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
сигнальная операция
signal
функция будетcondition wait queue
Узел потока в начале очереди передает поток, ожидающий получения блокировкиsync queue
в очереди. В этом случае,wait
вызов функцииisOnSyncQueue
функция вернет true, что приведет кwait
Функция входит в состояние, в котором последним шагом является повторное получение блокировки.
Давайте разберем это подробно здесьcondition wait queue
а такжеsync queue
Принцип построения двух очередей.condition wait queue
Это очередь, ожидающая сообщения, и она переходит в состояние блокировки, поскольку очередь блокировки пуста.take
Работа функции заключается в ожидании сообщения о том, что очередь блокировки не пуста. а такжеsync queue
Очередь — это очередь, ожидающая получения блокировки. Функция take может выполняться после получения сообщения, но она должна дождаться получения блокировки, прежде чем она сможет запуститься.
signal
Схематическая диаграмма функции показана ниже.
signal
На самом деле функция делает одну вещь, то есть продолжает пытаться вызватьtransferForSignal
функция, будетcondition wait queue
Узел во главе очереди перемещается вsync queue
в очереди до тех пор, пока передача не будет успешной. Поскольку передача прошла успешно, это означает, что сообщение было успешно уведомлено узлу, ожидающему сообщения.
public final void signal() {
if (!isHeldExclusively())
//如果当前线程没有获得锁,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将Condition wait queue中的第一个node转移到acquire lock queue中.
doSignal(first);
}
private void doSignal(Node first) {
do {
   //由于生产者的signal在有消费者等待的情况下,必须要通知
//一个消费者,所以这里有一个循环,直到队列为空
//把first 这个node从condition queue中删除掉
//condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal将node转而添加到Sync的acquire lock 队列
}
final boolean transferForSignal(Node node) {
//如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//需要注意的是这里的node进行了转化
//ws>0代表canceled的含义所以直接unpark线程
//如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
//进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
//这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
//如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
//等待锁被释放掉再次抢夺锁,然后再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
постскриптум
В следующей статье в основном объясняется, как использовать его самостоятельноAQS
Чтобы создать замок, отвечающий потребностям вашего бизнеса, пожалуйста, продолжайте обращать внимание на мою статью. Достигайте прогресса вместе.