Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики.
Нажмите, чтобы узнать подробностиwww.codercc.com
1. Введение в ArrayBlockingQueue
В процессе многопоточного программирования для разделения бизнеса и проектирования архитектуры параллельные контейнеры часто используются для хранения общих данных между несколькими потоками, что не только обеспечивает безопасность потоков, но и упрощает работу каждого потока. Например, в задаче «производитель-потребитель» в качестве контейнера данных будет использоваться блокирующая очередь (BlockingQueue).см. эту статью. Чтобы углубить понимание очереди блокировки, единственный способ понять ее экспериментальный принцип.В этой статье в основном рассматривается принцип реализации ArrayBlockingQueue и LinkedBlockingQueue.
2. Принцип реализации ArrayBlockingQueue
Основная функция блокирующей очереди заключается в том, что она может вставлять и удалять элементы очереди блокирующим образом. Когда текущая очередь пуста, поток, потребляющий данные, будет заблокирован до тех пор, пока очередь не станет пустой, и заблокированный поток будет уведомлен; когда очередь заполнена, поток, вставляющий данные, будет заблокирован до тех пор, пока очередь не будет заполнена, поток, вставляющий данные, будет уведомлен (производственный поток автора). Затем наиболее часто используемым механизмом уведомления о сообщениях в многопоточности является механизм блокировки по условию.Что касается условия, вы можетеПодробнее см. в этой статье.. Так будет ли реализация ArrayBlockingQueue также использовать механизм уведомления Condition? Посмотрите ниже.
2.1 Основные свойства ArrayBlockingQueue
Основные свойства ArrayBlockingQueue следующие:
/** The queued items */ final Object[] items;
/** items index for next take, poll, peek or remove */ int takeIndex;
/** items index for next put, offer, or add */ int putIndex;
/** Number of elements in the queue */ int count;
/*
- Concurrency control uses the classic two-condition algorithm
- found in any textbook. */
/** Main lock guarding all access */ final ReentrantLock lock;
/** Condition for waiting takes */ private final Condition notEmpty;
/** Condition for waiting puts */ private final Condition notFull;
Из исходного кода видно, что ArrayBlockingQueue использует для хранения данных массивы (属性items
), чтобы обеспечить безопасность потоков,ReentrantLock lock
, чтобы гарантировать, что блокируемая вставка и удаление данных использует условие, когда поток-потребитель, который получает данные, заблокирован, поток будет помещен в очередь ожидания notEmpty, а когда поток-производитель, который вставляет данные, заблокирован, он будет be Поток помещается в очередь ожидания notFull. Атрибуты notEmpty и notFull создаются в конструкторе:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Далее давайте сосредоточимся на том, как реализованы блокирующие методы put и take.
2.2 Подробное объяснение метода пут
put(E e)
Исходный код метода выглядит следующим образом:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果当前队列已满,将线程移入到notFull等待队列中
while (count == items.length)
notFull.await();
//满足插入数据的要求,直接进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
Логика этого метода проста, когда очередь заполнена (count == items.length
) Переместите поток в очередь ожидания notFull, если в данный момент выполняются условия для вставки данных, вы можете напрямую вызватьenqueue(e)
Вставьте элементы данных. Исходный код метода enqueue:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入数据
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消费者线程,当前队列中有数据可供消费
notEmpty.signal();
}
Логика метода enqueue тоже очень проста: сначала вставляем данные, то есть добавляем данные в массив (items[putIndex] = x
), а затем уведомить заблокированный поток-потребитель о том, что в текущей очереди есть данные, доступные для потребления (notEmpty.signal()
).
2.3 Подробное объяснение метода взятия
Исходный код метода take выглядит следующим образом:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,没有数据,将消费者线程移入等待队列中
while (count == 0)
notEmpty.await();
//获取数据
return dequeue();
} finally {
lock.unlock();
}
}
Метод take также в основном выполняет два шага: 1. Если текущая очередь пуста, перемещает поток-потребитель, получающий данные, в ожидающую очередь 2. Если очередь не пуста, получает данные, то есть завершает удаление из очереди. операцияdequeue
. Исходный код метода удаления из очереди:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取数据
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知被阻塞的生产者线程
notFull.signal();
return x;
}
Метод dequeue также в основном делает две вещи: 1. Получает данные в очереди, то есть получает элементы данных в массиве ((E) items[takeIndex]
); 2. Уведомить поток в очереди ожидания notFull о перемещении его из очереди ожидания в очередь синхронизации, чтобы у него была возможность получить блокировку и успешно выйти после выполнения.
Из приведенного выше анализа видно, что методы ввода и вывода в основном завершают вставку и получение блокируемых данных с помощью механизма уведомления о состоянии. Легко понять LinkedBlockingQueue после понимания ArrayBlockingQueue.
3. Принцип реализации LinkedBlockingQueue
LinkedBlockingQueue — это ограниченная очередь блокировки, реализованная со связанным списком.Если размер очереди указан при создании объекта, размер очереди по умолчанию равенInteger.MAX_VALUE
. Это видно из метода его построения:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
3.1 Основные свойства LinkedBlockingQueue
Основные свойства LinkedBlockingQueue:
/** Current number of elements */ private final AtomicInteger count = new AtomicInteger();
/**
- Head of linked list.
- Invariant: head.item == null */ transient Node<E> head;
/**
- Tail of linked list.
- Invariant: last.next == null */ private transient Node<E> last;
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
Видно, что основное отличие от ArrayBlockingQueue в том, что LinkedBlockingQueue использует две разные блокировки при вставке данных и удалении данных.takeLock
иputLock
) для управления потокобезопасностью, поэтому эти две блокировки также генерируют два соответствующих условия (notEmpty
иnotFull
) для реализации блокирующих вставок и удалений. И структура данных связанного списка используется для реализации очереди, а определение узла Node:
static class Node<E> { E item;
скопировать код/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; }
}
Далее мы также рассмотрим реализацию методов put и take.
3.2 Подробное объяснение метода put
Исходный код метода put:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果队列已满,则阻塞当前线程,将其移入等待队列
while (count.get() == capacity) {
notFull.await();
}
//入队操作,插入数据
enqueue(node);
c = count.getAndIncrement();
//若队列满足插入数据的条件,则通知被阻塞的生产者线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
Логику метода put также легко понять, см. комментарии. В основном то же самое, что и метод put ArrayBlockingQueue. Исходный код метода take выглядит следующим образом:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
while (count.get() == 0) {
notEmpty.await();
}
//移除队头元素,获取数据
x = dequeue();
c = count.getAndDecrement();
//如果当前满足移除元素的条件,则通知被阻塞的消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Основную логику метода take можно найти в комментариях, которые тоже легко понять.
4. Сравнение ArrayBlockingQueue и LinkedBlockingQueue
Та же точка: ArrayBlockingQueue и LinkedBlockingQueue реализуют блокируемую вставку и удаление элементов с помощью механизма уведомления о состоянии и соответствуют характеристикам безопасности потоков;
разница: 1. Нижний уровень ArrayBlockingQueue реализован с использованием массива, а LinkedBlockingQueue — это структура данных связанного списка;
- ArrayBlockingQueue вставляет и удаляет данные, используя только одну блокировку, в то время как LinkedBlockingQueue использует соответственно вставку и удаление.
putLock
иtakeLock
, что может уменьшить вероятность перехода потока в состояние WAITING, поскольку поток не может получить блокировку, тем самым повышая эффективность одновременного выполнения потока.