Отсканируйте QR-код ниже или WeChat, чтобы найти официальную учетную запись.
菜鸟飞呀飞
, вы можете следить за публичной учетной записью WeChat, читать далееSpring源码分析
иJava并发编程
статья.
Введение
В пакете JUC предоставляется ряд потокобезопасных очередей, часто называемых блокирующими очередями. Эти очереди блокировки широко используются в пулах потоков.Понимание принципа реализации очередей блокировки будет большим подспорьем при обычном использовании очередей блокировки. В этой статье в основном анализируется исходный кодLinkedBlockingQueue
Принцип реализации этой блокирующей очереди.
LinkedBlockingQueue — очередь блокировки, реализованная на основе связанного списка, размер очереди блокировки по умолчаниюInteger.MAX_VALUE
, Поскольку это значение особенно велико, LinkedBlockingQueue во многих местах называется неограниченной очередью. При инициализации LinkedBlockingQueue размер очереди можно указать вручную, чтобы LinkedBlockingQueue была ограниченной очередью.
Прежде чем рассматривать конкретный исходный код, мы можем подумать о том, как мы можем сами реализовать LinkedBlockingQueue.
Поскольку LinkedBlockingQueue является потокобезопасным, нам нужно решить проблему взаимного исключения и синхронизации, которую мы можем решить с помощью блокировки, предусмотренной в Java. В Java существует два типа блокировок: одна — неявная блокировка, реализованная с помощью synchronized, а другая — блокировка, реализованная мастером параллельного программирования Дугом Ли на основе AQS. Поскольку LinkedBlockingQueue — это класс, написанный Дугом Ли, нижний уровень LinkedBlockingQueue использует тип блокировки AQS, а именно: ReentrantLock.
Для очередей существует два типа операций: добавление элементов и удаление элементов. Когда в очереди нет элементов, операция выборки элементов не может быть выполнена, пока в очереди есть элементы; когда очередь заполнена, операция добавления элементов не может быть выполнена, а операция добавления элементов не может быть выполнена, пока очередь не заполнена. Фактически это паттерн производитель-потребитель, а реализация паттерна производитель-потребитель обычно реализуется с использованием классического паттерна ожидания/уведомления. В Java существует два типа ожидания/уведомления: один основан на методе wait()/notify() в классе Object, а другой основан на методе await()/signal() в классе Condition в AQS. , чтобы понять. Очевидно, что LinkedBlockingQueue использует await()/signal() в Condition.
структура данных
LinkedBlockingQueue содержит несколько очень важных свойств, которые реализуют потокобезопасность и функции ожидания/уведомления LinkedBlockingQueue. Эти свойства будут представлены одно за другим ниже.
- головной и последний. Тип этих двух свойств относится к типу Node, который является внутренним классом LinkedBlockingQueue. Каждый узел содержит два свойства: item и next. item является атрибутом последнего сохраненного элемента, next используется для указания на следующий узел, а односвязный список может поддерживаться внутри LinkedBlockingQueue с помощью атрибута next. где head и last представляют начало и конец связанного списка соответственно. Следует отметить, что в реальном хранилище атрибут элемента head всегда равен нулю, поэтому элемент head не хранит элементы, а просто представляет головной узел связанного списка.
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
-
емкость. Тип Int представляет максимальную емкость очереди. По умолчанию значение емкости будет установлено на Integer.MAX_VALUE. Вы также можете указать его значение вручную, при передаче значения типа int в конструкторе LinkedBlockingQueue емкость будет равна значению.
-
считать. AtomicInteger, тип этого атрибута является атомарным типом, который представляет количество элементов в текущей очереди.
-
взятьЗамок. Тип ReentrantLock. В LinkedBlockingQueue для получения и добавления элементов используются разные блокировки, а takeLock представляет собой блокировку, используемую при получении элементов.
-
поставитьЗамок. Тип ReentrantLock, putLock представляет блокировку, используемую при добавлении элементов.
-
не пусто. Непустая очередь ожидания, тип условия, когда очередь пуста, из очереди нельзя получить больше элементов.В это время поток, который хочет получить элементы из очереди, должен ждать, пока элемент не будет добавлен в очередь . Итак, где поток должен ждать в этот момент? Ожидание в непустой очереди ожидания notEmpty. Значение атрибута notEmpty создается блокировкой takeLock.
private final Condition notEmpty = takeLock.newCondition();
- не полный. Неполная очередь ожидания, Тип условия. Когда очередь заполнена, в нее нельзя добавить больше элементов. В это время поток, добавляющий элементы в очередь, должен ждать, пока очередь не заполнится. Итак, где поток должен ждать? Ожидание в очереди ожидания notFull. Значение атрибута notFull создается блокировкой putLock.
private final Condition notFull = putLock.newCondition();
Анализ исходного кода
Зная внутреннюю структуру данных LinkedBlockingQueue, мы теперь проанализируем принцип реализации LinkedBlockingQueue в сочетании с конкретным исходным кодом. Операции LinkedBlockingQueue делятся на две категории: хранение элементов и получение элементов.Методы хранения элементов: put(e), offer(e), offer(e, time, unit); методы взятия элементов: take (), опрос(), опрос(время, единица измерения), просмотр().
put(e)
Когда очередь заполнена, метод put(e) блокирует поток до тех пор, пока очередь не заполнится. Когда элемент успешно добавлен в очередь, метод put(e) вернется в конец, и этот метод не имеет возвращаемого значения.
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 可中断的获取锁
putLock.lockInterruptibly();
try {
// 如果队列满了,则进行等待,等待队列是非满状态
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 队列元素个数自增,注意,由于这里调用的是getAndIncrement()方法,
// 不是incrementAndGet()方法,所以返回的是自增之前的值。
c = count.getAndIncrement();
// 如果阻塞队列还没有满,就唤醒处于notFull等待队列中的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果阻塞队列在没有添加元素之前,阻塞队列的元素个数为0,那么可能有线程处于notEmpty的等待队列中
// 因此这里会唤醒处于notEmpty的等待队列中的线程
if (c == 0)
signalNotEmpty();
}
- При вызове put(e) сначала установите блокировку putLock, а затем оцените, заполнена ли очередь. Если она заполнена, вызовите метод notFull.await(), чтобы позволить текущему потоку войти в очередь ожидания notFull. full, метод notFull.signal() будет вызываться для пробуждения потоков в очереди ожидания notFull.
- Если очередь не заполнена, вызовите метод enqueue(), чтобы сохранить элементы в связанном списке, поддерживаемом внутри LinkedBlockingQueue.
- Когда элемент успешно помещен в очередь, оценивается, заполнена ли очередь, и если она не заполнена, пробуждается поток в очереди notFull. Наконец, прежде чем добавлять элементы, оцените, есть ли в очереди элементы. Если элементов нет, то могут быть потоки, ожидающие в очереди ожидания notEmpty. Вызов метода signalNotEmpty() разбудит потоки в очереди ожидания notEmpty.
Исходный код метода enqueue(node) выглядит следующим образом.
private void enqueue(Node<E> node) {
last = last.next = node;
}
Исходный код метода enqueue(node) относительно прост Ниже приведена диаграмма для понимания процесса постановки элементов в очередь.
offer(e)
Метод offer(e) не будет блокировать поток.Когда очередь блокировки заполнена, если элемент добавлен в очередь блокировки, метод offer(e) напрямую вернет false, и если элемент добавлен успешно, он будет вернуть истину. Исходный код выглядит следующим образом.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果阻塞队列满了,直接返回false
if (count.get() == capacity) // ①
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 入队
enqueue(node);
c = count.getAndIncrement();
// 未满通知
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 非空通知
if (c == 0)
signalNotEmpty();
// 如果线程调用putLock.lock()没有获取到锁,那么此时c等于-1,因此会也会返回false
return c >= 0;
}
Исходный код метода offer(e) в основном логически совпадает с исходным кодом метода put(e), разница заключается в месте, отмеченном ① в коде. Когда будет определено, что очередь заполнена, offer(e) сразу вернется в конец.Если она не заполнена, будет получена блокировка putLock, а затем будет выполнена операция добавления элемента.
offer(e,time,unit)
Метод offer(e, time, unit) поддерживает элемент хранения тайм-аута потока. Когда блокирующая очередь заполнена, текущий поток ожидает максимальное время. Если элемент все еще не сохранен в очереди в течение этого времени, он вернет false . Возвращает true, если элемент был успешно добавлен. Исходный код предложения (e, time, unit) выглядит следующим образом.
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 根据传入的time和时间单位,计算需要等待对少纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 如果超时,直接返回false
if (nanos <= 0)
return false;
// 等待(最终是调用LockSupport.parkNanos(this, nanosTimeout))
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(new Node<E>(e));
c = count.getAndIncrement();
// 未满通知
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 非空通知
if (c == 0)
signalNotEmpty();
return true;
}
Логика метода offer(e, time, unit) аналогична логике метода put(e), разница в том, что метод put(e) вызывает метод await() условия при ожидании, а offer (e, time, unit ) вызывает метод awaitNanos(nanos). awaitNanos(nanos) наконец вызывает метод parkNanos(this, nanosTimeout) LockSupport. Для анализа исходного кода Condition вы можете обратиться к этой статье:Анализ исходного кода состояния.
take()
Метод take() соответствует методу put(e), а метод take() используется для удаления элементов из очереди блокировки. Когда в очереди блокировки нет ни одного элемента, текущий поток будет ждать, пока очередь блокировки не станет пустой, и, наконец, вернет первый элемент, сохраненный в очереди блокировки. Исходный код метода take() выглядит следующим образом.
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果阻塞队列中一直没有元素,线程就一直等待,直到队列中有元素后调用notEmpty等待队列的signal()方法
while (count.get() == 0) {
notEmpty.await();
}
// 当阻塞队列中有元素后,会跳出上面的while循环,然后出阻塞队列
x = dequeue();
c = count.getAndDecrement();
// 如果阻塞队列中还有元素,就唤醒等待在notEmpty等待队列中的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果在元素出队列前,队列处于已满状态,那么从队列中移出一个元素后,队列就变为非满状态了
// 此时就唤醒等待在notFull等待队列中的线程
if (c == capacity)
signalNotFull();
return x;
}
- При вызове метода take() сначала определите, есть ли элементы в LinkedBlockingQueue. Если нет, вызовите метод notEmpty.await(), чтобы позволить текущему потоку войти в очередь ожидания notEmpty для ожидания. Когда в LinkedBlockingQueue есть элементы, другие потоки будут вызывать метод notEmpty.signal(), который разбудит текущий поток и продолжит выполнение следующей логики.
- Если в LinkedBlockingQueue есть элемент, вызовите метод dequeue(), чтобы удалить элемент из очереди. После извлечения элемента оцените, есть ли еще элементы в очереди.Если элементы еще есть, разбудите элементы в очереди ожидания notEmpty.
- Наконец, определите, заполнена ли LinkedBlockingQueue.Если она не заполнена, вызовите метод signalNotFull(), чтобы разбудить потоки, ожидающие в очереди ожидания notFull.
dequeue()
Метод извлечет первый сохраненный элемент из очереди LinkedBlockingQueue.Поскольку головной узел в очереди LinkedBlockingQueue не хранит элементы, значение атрибута item узла head.next извлекается. Исходный код метода dequeue() выглядит следующим образом.
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
// 令head节点的next指针执行自己
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
В методе dequeue() сделайте второй узел новым головным узлом и сделайте так, чтобы следующий указатель старого головного узла указывал на себя (почему бы не позволить следующему указателю старого головного узла указывать на ноль? Причина будет обсуждаться позже). позже). Этот код метода dequeue() фактически работает со связанным списком и изменяет указатель на него. Код может быть трудоемким для чтения, поэтому давайте разберемся с ним с помощью диаграммы ниже.
Почему бы не сделать так, чтобы следующий указатель старого головного узла указывал на ноль? Это связано с тем, что операция выборки элемента и обход LinkedBlockingQueue всех элементов через итератор могут выполняться одновременно, и если следующий указатель указывает на null в этом месте, то при обходе итератора будет ошибка. . Конкретный исходный код итератора можно посмотреть в исходном коде внутреннего класса Itr LinkedBlockingQueue.
poll()
Метод poll() также берет элементы из LinkedBlockingQueue, но не блокирует поток.Когда в очереди LinkedBlockingQueue нет элементов, метод poll() напрямую возвращает значение null, когда элементы есть, он пытается получить сначала замок, а затем извлеките элемент. Исходный код выглядит следующим образом:
public E poll() {
final AtomicInteger count = this.count;
// 如果队列为空,就立即返回null,不会阻塞线程
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 取元素
x = dequeue();
c = count.getAndDecrement();
// 非空通知
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 非满通知
if (c == capacity)
signalNotFull();
return x;
}
Логика метода poll() в основном такая же, как и у метода take(), разница в том, что когда в очереди LinkedBlockingQueue нет элементов, poll() не будет блокировать поток, а take заблокирует поток .
poll(time,unit)
Если в LinkedBlockingQueue нет элементов, poll(time, unit) также заблокирует поток, который поддерживает блокировку по тайм-ауту. Когда ни один элемент не получен в течение периода времени, будет возвращено значение null.
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 如果已经超时,直接返回null
if (nanos <= 0)
return null;
// 等待(调用的是LockSupport.parkNanos(this, nanosTimeout))
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Разница между poll(time, unit) и take() заключается в том, что take всегда блокируется, а poll(time, unit) блокируется по тайм-ауту.
peek()
Метод peek() также получает элементы из очереди, но он получает только первый элемент в очереди и не удаляет элемент из очереди.
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 取第一个元素
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
Из исходного кода метода peek() видно, что peek() удаляет только первый элемент в очереди, но не изменяет указатель связанного списка, поэтому он не удалит элемент из очереди.
Суммировать
- LinkedBlockingQueue — это потокобезопасная очередь, которая представляет собой неограниченную очередь, основанную на базовой реализации связанного списка.Если указана емкость очереди, это ограниченная очередь.
- В LinkedBlockingQueue для извлечения и сохранения элементов используются две блокировки, тип блокировки — ReentrantLock. Он реализует модель производитель-потребитель, используя ожидание/уведомление Condition.
- Поскольку для хранения и выборки элементов в LinkedBlockingQueue используются две блокировки, операции доступа могут выполняться одновременно, поэтому его пропускная способность выше, чем у ArrayBlockingQueue.
рекомендовать
- Мониторы: краеугольный камень параллельного программирования
- Первое понимание принципа реализации CAS
- Интерпретация исходного кода класса Unsafe и сценариев использования
- Принцип разработки синхронизатора очередей (AQS)
- Анализ исходного кода Queue Synchronizer (AQS)
- Повторная блокировка (ReentrantLock) Анализ исходного кода
- Сравнение честных и нечестных замков
- Анализ исходного кода состояния
- Принцип реализации блокировки чтения-записи ReadWriteLock (Tickets.WeChat.QQ.com/Yes/Period 2_Small 3WG i…)
- Принцип реализации пула потоков ThreadPoolExecutor
- Почему запрещено использовать Исполнители для создания пулов потоков в «Руководстве по разработке Java для Alibaba»
- Будущее параллельного программирования — наиболее часто используемый метод оптимизации производительности.