предисловие
что такое очередь
очередь - этоСпециальный линейный стол FIFO, именуемый FIFO. Особенность в том, что вставлять можно только с одного конца и удалять с другого конца.
Конец, выполняющий операцию вставки, называется хвостом очереди, а конец, выполняющий операцию удаления, называется началом очереди. Когда в очереди нет элементов, она называется пустой очередью.
Очереди часто используются в программировании, включая некоторые промежуточные структуры данных, лежащие в основе очередей (основное содержание не слишком подробно объясняется).
Что такое очередь блокировки
Очередь — это просто очередь, что, черт возьми, за блокирующая очередь?
Блокирующая очередь — это очередь, которая добавляет в очередь две дополнительные операции, а именно:
- Метод вставки, поддерживающий блокировку: когда емкость очереди заполнена, поток вставки элементов будет заблокирован до тех пор, пока в очереди не будет избыточной емкости.
- Метод удаления блокировки: когда в очереди нет элемента, поток, удаляющий элемент, будет заблокирован до тех пор, пока в очереди не появится элемент, который можно удалить.
В статье используется LinkedBlockingQueue в качестве примера для описания различий между очередями.Для удобства чтения LinkedBlockingQueue использует псевдоним LBQ.
Поскольку это статья об анализе исходного кода,Рекомендуется друзьям для просмотра на ПК. Конечно, если экран достаточно большой, я этого не говорил~
Отношение наследования очереди блокировки
Очередь блокировки — это абстрактное имя, основная структура данных очереди блокировки.Это может быть массив, это может быть односвязный список, или это может быть двусвязный список...
ЛБК – этоОчередь, состоящая из односвязного списка, на следующем рисунке показаны верхняя и нижняя диаграммы отношений наследования LBQ.
Как видно из рисунка, LBQ реализует интерфейс BlockingQueue, а BlockingQueue реализует интерфейс Queue
Анализ интерфейса очереди
В нисходящем порядке мы сначала анализируем, какие методы определены в волне интерфейсов Queue.
// 如果队列容量允许,立即将元素插入队列,成功后返回
// 🌟如果队列容量已满,则抛出异常
boolean add(E e);
// 如果队列容量允许,立即将元素插入队列,成功后返回
// 🌟如果队列容量已满,则返回 false
// 当使用有界队列时,offer 比 add 方法更何时
boolean offer(E e);
// 检索并删除队列的头节点,返回值为删除的队列头节点
// 🌟如果队列为空则抛出异常
E remove();
// 检索并删除队列的头节点,返回值为删除的队列头节点
// 🌟如果队列为空则返回 null
E poll();
// 检查但不删除队列头节点
// 🌟如果队列为空则抛出异常
E element();
// 检查但不删除队列头节点
// 🌟如果队列为空则返回 null
E peek();
Если обобщить методы интерфейса Queue, то они разделены на три категории:
- Добавить элементы в контейнер очереди: добавить, предложить
- Удалить элементы из контейнера очереди: удалить, опросить
- Запросить, пуст ли головной узел очереди: element, peek
Учитывая программную надежность интерфейса API, его можно разделить на две категории:
- Надежный API: предложение, опрос, просмотр
- Ненадежный API: добавить, удалить, элемент
Интерфейс API ненадежен, упомянутый здесь предел надежности означает, что если используется ненадежный API, у программы будет больше шансов сделать ошибки, поэтому мыЧто должно быть больше беспокоит, так это то, как поймать возможные исключения и соответствующую обработку исключений.
Анализ интерфейса BlockingQueue
Интерфейс BlockingQueue наследуется от интерфейса Queue, поэтому некоторые интерфейсы API с такой же семантикой не выставляются на интерпретацию
// 将指定元素插入队列,如果队列已满,等待直到有空间可用;通过 throws 异常得知,可在等待时打断
// 🌟相对于 Queue 接口而言,是一个全新的方法
void put(E e) throws InterruptedException;
// 将指定元素插入队列,如果队列已满,在等待指定的时间内等待腾出空间;通过 throws 异常得知,可在等待时打断
// 🌟相当于是 offer(E e) 的扩展方法
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 检索并除去此队列的头节点,如有必要,等待直到元素可用;通过 throws 异常得知,可在等待时打断
E take() throws InterruptedException;
// 检索并删除此队列的头,如果有必要使元素可用,则等待指定的等待时间;通过 throws 异常得知,可在等待时打断
// 🌟相当于是 poll() 的扩展方法
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 返回队列剩余容量,如果为无界队列,返回 Integer.MAX_VALUE
int remainingCapacity();
// 如果此队列包含指定的元素,则返回 true
public boolean contains(Object o);
// 从此队列中删除所有可用元素,并将它们添加到给定的集合中
int drainTo(Collection<? super E> c);
// 从此队列中最多移除给定数量的可用元素,并将它们添加到给定的集合中
int drainTo(Collection<? super E> c, int maxElements);
Видно, что персонализированных методов в интерфейсе BlockingQueue еще довольно много. LBQ свиной лапки в этой статье реализован из интерфейса BlockingQueue.
Анализ исходного кода
Анализ переменных
Чтобы обеспечить параллельное добавление, удаление и другие операции, LBQ использует элементы управления ReentrantLock и Condition в пакете JUC.
// take, poll 等移除操作需要持有的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列没有数据时,删除元素线程被挂起
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等新增操作需要持有的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列为空时,添加元素线程被挂起
private final Condition notFull = putLock.newCondition();
Почему используется количество элементов в поле ArrayBlockingQueue (ABQ)переменная count типа int? Не беспокойтесь о параллелизме
- Поскольку блокировка, используемая внутри ABQ, управляет операциями постановки в очередь и удаления из очереди, только один поток будет одновременно выполнять изменение переменной count.
- LBQ использует две блокировки, поэтому два потока будут изменять значение счетчика одновременно.Если тип int используется как ABQ, два процесса будут выполняться и изменять число счетчика одновременно, что приведет к неточности данных. , поэтому необходимо использовать параллельную модификацию атомарного класса
Если вы не понимаете, почему вы используете атомарные классы для подсчета количества,кликните сюда
Затем начните со структуры, узнайте, из каких элементов она состоит и что делает каждый элемент. Если структура данных неплохая, вы сможете ее угадать.
// 绑定的容量,如果无界,则为 Integer.MAX_VALUE
private final int capacity;
// 当前队列中元素个数
private final AtomicInteger count = new AtomicInteger();
// 当前队列的头节点
transient Node<E> head;
// 当前队列的尾节点
private transient Node<E> last;
Видя головной и последний элементы, есть ли у вас грубый прототип для LBQ?На данный момент есть еще структура Node.
static class Node<E> {
// 节点存储的元素
E item;
// 当前节点的后继节点
LinkedBlockingQueue.Node<E> next;
Node(E x) { item = x; }
}
Анализ конструктора
Вот картинка, чтобы понять, как конструктор по умолчанию LBQ инициализирует очередь
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
Видно, что метод построения по умолчанию установит емкость на Integer.MAX_VALUE, что часто называют неограниченной очередью.
На самом деле перегруженная параметризованная конструкция вызывается внутри, емкость задается внутри метода, а узел Node, элемент которого пуст, инициализируется для связывания двух узлов head и last.
Элемент и следующий в узле, на который указывает последний узел инициализированной очереди, пусты.В это время, если запись будет добавлена, что произойдет с очередью?
Присоединяйтесь к команде
Добавляемый элемент будет инкапсулирован как Node и добавлен в очередь, семантика метода put enqueue,Если элемент очереди заполнен, заблокируйте текущий поток вставки, пока в очереди не появится вакансия для пробуждения.
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e); // 将需要添加的数据封装为 Node
final ReentrantLock putLock = this.putLock; // 获取添加操作的锁
final AtomicInteger count = this.count; // 获取队列实际元素数量
putLock.lockInterruptibly(); // 运行可被中断加锁 API
try {
while (count.get() == capacity) { // 如果队列元素数量 == 队列最大值,则将线程放入条件队列阻塞
notFull.await();
}
enqueue(node); // 执行入队流程
c = count.getAndIncrement(); // 获取值并且自增,举例:count = 0,执行后结果值 count+1 = 2,返回 0
if (c + 1 < capacity) // 如果自增过的队列元素 +1 小于队列容器最大数量,唤醒一条被阻塞在插入等待队列的线程
notFull.signal();
} finally {
putLock.unlock(); // 解锁操作
}
if (c == 0) // 当队列中有一条数据,则唤醒消费组线程进行消费
signalNotEmpty();
}
Общий процесс присоединения к команде относительно ясен, и выполняются следующие действия:
- Если очередь заполнена, текущий поток будет заблокирован
- Если в очереди есть свободная позиция, узел, инкапсулирующий данные, выполняет операцию постановки в очередь.
- Если после того, как Node выполнит операцию постановки в очередь, в очереди еще есть место, разбудите добавляемый поток в очереди ожидания.
- Если до постановки данных в очередь в очереди нет элементов, разбудите поток в очереди блокировки потребления после успешной постановки в очередь.
Продолжайте видеть, как присоединиться к командеLBQ#enqueueчто ты сделал
private void enqueue(Node<E> node) {
last = last.next = node;
}
Код относительно прост: сначала назначьте узел следующему атрибуту текущего последнего узла, а затем укажите последний узел на узел, чтобы завершить операцию ввода узла.
Предполагая, что универсальным типом LBQ является String, элемент a вставляется первым, и очередь показана на следующем рисунке:
Какие? Недостаточно данных? Нет ничего, что не могло бы быть решено другим, и элемент b ставится в очередь следующим образом:
Запись очереди показана на рисунке выше, элемент в голове всегда пуст, а следующий в конце всегда пуст.
LBQ#offerЭто также метод постановки в очередь, разница в следующем:Если элемент очереди заполнен, верните false напрямую, не блокируя поток.
Удаление узла из очереди
LBQ#takeметод удаления из очереди,Если элемент в очереди пуст, заблокируйте текущий поток удаления из очереди, пока в очереди не появится элемент
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count; // 获取当前队列实际元素个数
final ReentrantLock takeLock = this.takeLtakeLocock; // 获取 takeLock 锁实例
takeLock.lockInterruptibly(); // 获取 takeLock 锁,获取不到阻塞过程中,可被中断
try {
while (count.get() == 0) { // 如果当前队列元素 == 0,当前获取节点线程加入等待队列
notEmpty.await();
}
x = dequeue(); // 当前队列元素 > 0,执行头节点出队操作
c = count.getAndDecrement(); // 获取当前队列元素个数,并将数量 - 1
if (c > 1) // 当队列中还有还有元素时,唤醒下一个消费线程进行消费
notEmpty.signal();
} finally {
takeLock.unlock(); // 释放锁
}
if (c == capacity) // 移除元素之前队列是满的,唤醒生产者线程添加元素
signalNotFull();
return x; // 返回头节点
}
Общий процесс операции удаления из очереди ясен и ясен, что аналогично процессу выполнения операции постановки в очередь.
- Если очередь заполнена, текущий поток удаления из очереди будет заблокирован.
- Если в очереди есть элементы, которые можно использовать, выполните операцию извлечения узла из очереди.
- Если после удаления узла из очереди в очереди есть элементы, подлежащие удалению, разбудить поток удаления из очереди, ожидающий в очереди.
- Если перед удалением элемента очередь заполнена, разбудите поток производителя, чтобы добавить элемент.
LBQ#dequeueОперация удаления из очереди немного сложнее, чем операция постановки в очередь.
private E dequeue() {
Node<E> h = head; // 获取队列头节点
Node<E> first = h.next; // 获取头节点的后继节点
h.next = h; // help GC
head = first; // 相当于把头节点的后继节点,设置为新的头节点
E x = first.item; // 获取到新的头节点 item
first.item = null; // 因为头节点 item 为空,所以 item 赋值为 null
return x;
}
В процессе удаления из очереди исходный головной узел будет указан сам на себя. Это поможет сборщику мусора повторно использовать текущий узел, а затем установить следующий узел исходного заголовка в качестве нового заголовка. На следующем рисунке показан полный процесс удаления из очереди. .
Блок-схема удаления из очереди такая же, как и выше, и в процессе нет особого внимания. Еще одинLBQ#pollметод удаления из очереди,Если элемент в очереди пуст, возвращаем null, а не блокируем как take
Запрос узла
Поскольку метод поиска элементов реализован в родительском классе AbstractQueue, в LBQ реализован только метод просмотра, а запрос узла представлен методом просмотра.
заглянуть и элементОба получают данные головного узла очереди, разница между ними в том,Первый возвращает null, если очередь пуста, а второй генерирует соответствующие исключения.
public E peek() {
if (count.get() == 0) // 队列为空返回 null
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 获取锁
try {
LinkedBlockingQueue.Node<E> first = head.next; // 获取头节点的 next 后继节点
if (first == null) // 如果后继节点为空,返回 null,否则返回后继节点的 item
return null;
else
return first.item;
} finally {
takeLock.unlock(); // 解锁
}
}
Увидев это, можно сделать вывод, что хотя элемент головного узла всегда равен нулю, метод peek получает элемент узла head.next.
Удаление узла
Операция удаления должна получить две блокировки, поэтому такие операции, как получение узла, удаление узла из очереди и постановка узла в очередь, будут заблокированы.
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); // 获取两把锁
try {
// 从头节点开始,循环遍历队列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) { // item == o 执行删除操作
unlink(p, trail); // 删除操作
return true;
}
}
return false;
} finally {
fullyUnlock(); // 释放两把锁
}
}
Операция удаления связанного списка обычно выполняется одна за другой в цикле.Сложность времени обхода O(n), в худшем случае нужно обойти все узлы в связанном списке.
посмотриLBQ#removeКак unlink отменяет ассоциацию узла
void unlink(Node<E> p, Node<E> trail) {
p.item = null; // 以第一次遍历而言,trail 是头节点,p 为头节点的后继节点
trail.next = p.next; // 把头节点的后继指针,设置为 p 节点的后继指针
if (last == p) // 如果 p == last 设置 last == trail
last = trail;
// 如果删除元素前队列是满的,删除后就有了空余位置,唤醒生产线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
Метод удаления аналогичен методу взятия.Если элемент метода удаления является головным узлом, эффект будет таким же, как и у метода взятия, и элемент головного узла удаляется из очереди.
Для лучшего понимания удалим средний элемент. Нарисуйте две картинки, чтобы понять всю историю, код выглядит следующим образом:
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue();
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// 删除队列中间元素
blockingQueue.remove("b");
}
После выполнения трех операций предложения в приведенном выше коде схема структуры очереди выглядит следующим образом:
Структура очереди после выполнения операции удаления элемента b выглядит следующим образом:
Если узел p является последним хвостовым узлом, установите узел-предшественник p в новый хвостовой узел. Операция удаления примерно такая
Сценарии применения
Как упоминалось выше, очереди блокировки используются в большом количестве бизнес-сценариев.Вот два практических примера, которые помогут вам понять
Модель производитель-потребитель
Модель «производитель-потребитель» представляет собой типичную многопоточную модель параллельной записи. Между производителем и потребителем необходим контейнер для решения отношений сильной связи. Производитель помещает данные в контейнер, а потребитель потребляет данные контейнера.
Реализации производитель-потребитель бывают разными способами
- ждать, уведомлять, уведомлять всех в классе объектов
- await, signal, signalВсе условия в блокировке
- BlockingQueue
Очередь блокировки реализует код модели производитель-потребитель следующим образом:
@Slf4j
public class BlockingQueueTest {
private static final int MAX_NUM = 10;
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_NUM);
public void produce(String str) {
try {
QUEUE.put(str);
log.info(" 🔥🔥🔥 队列放入元素 :: {}, 队列元素数量 :: {}", str, QUEUE.size());
} catch (InterruptedException ie) {
// ignore
}
}
public String consume() {
String str = null;
try {
str = QUEUE.take();
log.info(" 🔥🔥🔥 队列移出元素 :: {}, 队列元素数量 :: {}", str, QUEUE.size());
} catch (InterruptedException ie) {
// ignore
}
return str;
}
public static void main(String[] args) {
BlockingQueueTest queueTest = new BlockingQueueTest();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
String str = "元素-";
while (true) {
queueTest.produce(str + finalI);
}
}).start();
}
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
queueTest.consume();
}
}).start();
}
}
}
приложение пула потоков
Конкретное применение очереди блокировки в пуле потоков относится к реальному сценарию производитель-потребитель.
Важность пулов потоков в приложениях Java очевидна.Вот краткое введение в принципы работы пулов потоков.
- Количество потоков в пуле потоков меньше, чем количество основных потоков для выполнения операции добавления новых основных потоков.
- Когда количество потоков в пуле потоков больше или равно количеству основных потоков, задача сохраняется в очереди блокировки.
- Если количество потоков в пуле потоков больше или равно количеству основных потоков, а очередь блокировки заполнена, пул потоков создает неосновные потоки.
Основное внимание уделяется второму моменту, когда все основные потоки пула потоков являются запущенными задачами, задачи будут храниться в очереди блокировки. Исходный код пула потоков выглядит следующим образом:
if (isRunning(c) && workQueue.offer(command)) {}
См. используемый метод предложения, как описано выше, возвращает false, если очередь блокировки заполнена. Когда будут использованы элементы в очереди? Он включает в себя принцип процесса выполнения потока в пуле потоков, который кратко объясняется здесь.
- Потоки в пуле потоков могут выполнять задачи двумя способами: один — при создании основного потока.Принести свой собственныйДругой - получить задачу из очереди блокировки
- Когда основной поток выполняет задачу, он фактически ничем не отличается от неосновного потока.
Пул потоков использует два API для получения блокирующих задач очереди, а именно опрос и прием.
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Вопрос. Зачем использовать два API? Один не ароматный?
A: take — это важное средство для поддержания основных потоков в пуле потоков.Если задача не может быть получена, поток будет приостановлен и будет ожидать добавления следующей задачи.
Что касается пула со временем, то он готовится к переработке непрофильных потоков
Вывод
Очередь блокировки LBQ объясняется здесь, а основные характеристики LBQ, описанные в статье, резюмируются ниже.
- LBQ — это блокирующая очередь, реализованная на основе связанного списка, который может читать и записывать одновременно.
- Емкость очереди LBQ можно задать самостоятельно, если не установить максимальное значение Integer по умолчанию, ее также можно назвать неограниченной очередью.
В статье объединен исходный код для подробного объяснения операций постановки в очередь LBQ, исключения из очереди, запроса и удаления.
LBQ - это только введение, я надеюсь, что все смогут пройти статьюОвладейте основной идеей блокировки очереди, а затем просмотрите код других классов реализации, чтобы закрепить знания
Друзья теперь знают, что LBQ реализует контроль безопасности параллелизма через механизм блокировки, подумайте об этомМожно ли этого добиться без использования замков и как? Увидимся в следующий раз!
Поиск в Wechat [круг интереса к исходному коду], подпишитесь на официальный аккаунт и ответьте 123, чтобы получить учебные материалы, такие как GO, Netty, Seata, SpringCloud Alibaba, спецификации разработки, сборник интервью, структуру данных и так далее!