введение
В дополнение к различным параллельным контейнерам, упомянутым выше, JDK также предоставляет богатую очередь блокировки. Очередь блокировки единообразно реализует интерфейс BlockingQueue, а интерфейс BlockingQueue предоставляет два метода блокировки PUT(E) и Take(), основанные на интерфейсе Queue пакета Java.util. Его основной сценарий использования — многопоточная модель производителя-потребителя, причем поток-производитель будет производить элементы через метод PUT(E), поток-потребитель — через элементы потребления Take(). В дополнение к блокирующим функциям интерфейс BlockingQueue также определяет время Предложения и Опроса, а также метод одноразового удаления DRAINTO.
//插入元素,队列满后会抛出异常
boolean add(E e);
//移除元素,队列为空时会抛出异常
E remove();
//插入元素,成功反会true
boolean offer(E e);
//移除元素
E poll();
//插入元素,队列满后会阻塞
void put(E e) throws InterruptedException;
//移除元素,队列空后会阻塞
E take() throws InterruptedException;
//限时插入
boolean offer(E e, long timeout, TimeUnit unit)
//限时移除
E poll(long timeout, TimeUnit unit);
//获取所有元素到Collection中
int drainTo(Collection<? super E> c);
В JDK1.8 есть 7 реализаций очереди блокировки, а именно ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, SynchronousQueue, LinkedTransferQueue и LinkedBlockingDeque, Давайте проведем их простой анализ по очереди.
ArrayBlockingQueue
ArrayBlockingQueue представляет собой ограниченную блокирующую очередь, реализованную массивом внизу.Ограничение означает, что ее емкость фиксирована и не может быть расширена.Размер очереди должен быть определен при инициализации. Он контролирует параллелизм с помощью повторной эксклюзивной блокировки ReentrantLock и условия для достижения блокировки.
//通过数组来存储队列中的元素
final Object[] items;
//初始化一个固定的数组大小,默认使用非公平锁来控制并发
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//初始化固定的items数组大小,初始化notEmpty以及notFull两个Condition来控制生产消费
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);//通过ReentrantLock来控制并发
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Видно, что ArrayBlockingQueue инициализирует ReentrantLock и два условия для управления созданием и потреблением параллельных очередей. Здесь мы сосредоточимся на блокирующих методах put и take:
//插入元素到队列中
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取独占锁
try {
while (count == items.length) //如果队列已满则通过await阻塞put方法
notFull.await();
enqueue(e); //插入元素
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) //插入元素后将putIndex+1,当队列使用完后重置为0
putIndex = 0;
count++;
notEmpty.signal(); //队列添加元素后唤醒因notEmpty等待的消费线程
}
//移除队列中的元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取独占锁
try {
while (count == 0) //如果队列已空则通过await阻塞take方法
notEmpty.await();
return dequeue(); //移除元素
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) //移除元素后将takeIndex+1,当队列使用完后重置为0
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //队列消费元素后唤醒因notFull等待的消费线程
return x;
}
В процессе добавления и удаления элементов из очереди для управления процессом производства и потребления элементов используются putIndex, takeIndex и count, putIndex отвечает за запись индекса следующего элемента, который может быть добавлен, а takeIndex отвечает за запись индекса следующего удаляемого элемента.mark, count записывает общее количество элементов в очереди. Когда очередь заполнена, поток-производитель блокируется с помощью notFull.await(), а заблокированный поток-производитель пробуждается с помощью notFull.signal() после потребления элементов. После того, как очередь пуста, поток-потребитель блокируется с помощью notEmpty.await(), а заблокированный поток-потребитель пробуждается с помощью notEmpty.signal() после создания элемента.
Ограниченные по времени методы вставки и удаления реализованы в ArrayBlockingQueue через awaitNanos, который возвращается напрямую, если поток не пробуждается по истечении заданного времени.
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout); //获取定时时长
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0) //指定时长过后,线程仍然未被唤醒则返回false
return false;
nanos = notFull.awaitNanos(nanos); //指定时长内阻塞线程
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
Есть и более важный способ: DRAINTO, метод DRAINTO может получать все элементы в очереди единовременно, что уменьшает количество очередей блокировок и использует производительность в некоторых сценариях.
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock; //仅获取一次锁
lock.lock();
try {
int n = Math.min(maxElements, count); //获取队列中所有元素
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x); //循环插入元素
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal(); //唤醒等待的生产者线程
}
}
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue — это ограниченная очередь блокировки, реализованная односвязным списком в нижней части.Как и ArrayBlockingQueue, ReentrantLock используется для управления параллелизмом, но использует две эксклюзивные блокировки для управления потреблением и производством. Исходный код методов put и take выглядит следующим образом:
public void put(E e) throws InterruptedException {
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//因为使用了双锁,需要使用AtomicInteger计算元素总量,避免并发计算不准确
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); //队列已满,阻塞生产线程
}
enqueue(node); //插入元素到队列尾部
c = count.getAndIncrement(); //count + 1
if (c + 1 < capacity) //如果+1后队列还未满,通过其他生产线程继续生产
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) //只有当之前是空时,消费队列才会阻塞,否则是不需要通知的
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//将新元素添加到链表末尾,然后将last指向尾部元素
last = last.next = node;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await(); //队列为空,阻塞消费线程
}
x = dequeue(); //消费一个元素
c = count.getAndDecrement(); //count - 1
if (c > 1) // 通知其他等待的消费线程继续消费
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) //只有当之前是满的,生产队列才会阻塞,否则是不需要通知的
signalNotFull();
return x;
}
//消费队列头部的下一个元素,同时将新头部置空
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
Видно, что LinkedBlockingQueue контролирует производство и потребление через две блокировки, takeLock и putLock, не мешая друг другу.Пока очередь не заполнена, производственный поток может продолжать производить, пока очередь не пуста, поток-потребитель может продолжать потреблять и не будет блокировать взаимоисключающие блокировки во время блокировки.
Прочитав базовые реализации LinkedBlockingQueue и ArrayBlockingQueue, вы обнаружите проблему. Обычно потребители и производители могут выполняться одновременно, что значительно повышает пропускную способность очереди. Так почему же ArrayBlockingQueue не использует двойные блокировки для реализации очередей? а расход? Насколько я понимаю, ArrayBlockingQueue также может использовать двойные блокировки для достижения функций, но поскольку он использует простую структуру, такую как массив внизу, он эквивалентен общей переменной.Если используются две блокировки, требуется более точное управление блокировкой, вот почему JDK1.ConcurrentHashMap в версии 7 реализован с использованием сегментированных блокировок, разделяющих массив на несколько массивов для улучшения параллелизма. LinkedBlockingQueue не имеет этой проблемы. Головной и хвостовой узлы структуры данных связанного списка относительно независимы, а хранилище не является непрерывным. В управлении двойной блокировкой нет сложности. Это мое понимание, если у вас есть лучший вывод, пожалуйста, оставьте сообщение для обсуждения.
PriorityBlockingQueue
PriorityBlockingQueue — неограниченная очередь, реализованная массивом внизу, с функцией сортировки, а также использует ReentrantLock для управления параллелизмом. Поскольку он не ограничен, вставка элементов не будет блокироваться, очереди нет, только очередь пуста. По этим двум характеристикам мы можем предположить, что это должно быть реализовано с эксклюзивной блокировкой (основной массив) и условием (только потребление уведомлений). Анализ исходного кода методов put и take выглядит следующим образом:
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//无界队列,队列长度不够时会扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//通过comparator来实现优先级排序
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal(); //和ArrayBlockingQueue一样,每次添加元素后通知消费线程
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await(); //队列为空,阻塞消费线程
} finally {
lock.unlock();
}
return result;
}
DelayQueue
DelayQueue — это тоже неограниченная очередь, которая реализована на основе PriorityQueue, сначала она сортируется по приоритету задержки, а очередь с малым временем задержки ранжируется первой. Подобно PriorityBlockingQueue, нижний слой также является массивом, а ReentrantLock используется для управления параллелизмом. Поскольку он не ограничен, при вставке элементов нет блокировки, и нет полного состояния очереди. Простейших сценариев использования, которые я могу придумать, обычно два: один — истечение срока действия кэша, а другой — запланированные задачи. Но поскольку он не ограничен, истечение срока действия кеша обычно не используется. Просто посмотрите на методы put и take:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();//优先级队列
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //插入元素到优先级队列
if (q.peek() == e) { //如果插入的元素在队列头部
leader = null;
available.signal(); //通知消费线程
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //获取头部元素
if (first == null)
available.await(); //空队列阻塞
else {
long delay = first.getDelay(NANOSECONDS); //检查元素是否延迟到期
if (delay <= 0)
return q.poll(); //到期则弹出元素
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); //阻塞未到期的时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
SynchronousQueue
Synchronousqueue особенный по сравнению с предыдущими четыреми очередями. Это очередь без емкости, что означает, что она не будет хранить данные внутри. После каждого поставленного необходимо выполнить принять, в противном случае один и тот же нить будет продолжать блокировать. Эта функция очень подходит для выполнения некоторой транзитивной работы, одна резьба производит и потребляет один нить. Внутренне он разделен на два режима: справедливый и недобросовестный доступ. По умолчанию используются несправедливые и неиспользованные замки. Все параллелизм достигается через операции CAS, а пропускная способность очень высока. Вот только краткий анализ принимающих и ставят методы под его несправедливой реализацией:
//非公平情况下调用内部类TransferStack的transfer方法put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
//非公平情况下调用内部类TransferStack的transfer方法take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
//具体的put以及take方法,只有E的区别,通过E来区别REQUEST还是DATA模式
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
//栈无元素或者元素和插入的元素模式相匹配,也就是说都是插入元素
if (h == null || h.mode == mode) {
//有时间限制并且超时
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next); // 重新设置头节点
else
return null;
}
//未超时cas操作尝试设置头节点
else if (casHead(h, s = snode(s, e, h, mode))) {
//自旋一段时间后未消费元素则挂起put线程
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
//栈不为空并且和头节点模式不匹配,存在元素则消费元素并重新设置head节点
else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
}
//节点正在匹配阶段
else { // help a fulfiller
SNode m = h.next; // m is h match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//先自旋后挂起的核心方法
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//计算自旋的次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
//匹配成功过返回节点
if (m != null)
return m;
//超时控制
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋检查,是否进行下一次自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this); //在这里挂起线程
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
Код очень сложный, вот основная логика, которую я понимаю. В коде можно увидеть, что методы поставленного и предпринимательства реализуются путем вызова метода передачи, а затем дифференцируемого режимом параметра. При создании элемента, если тот же нить ставит несколько раз, он попытается поставить элемент несколько Времена спиннинга. Возможно, что элементы будут потребляться во время процесса прядения, которые могут быть введены во времени и уменьшить потерю производительности суспензии нитей. Высокопропускное ядро также здесь. Как и потребляющая нить, она будет вращаться. Сначала, когда стек пуст, и спин терпит неудачу, а затем приостановлен подвесной методом locksupport.park.
LinkedTransferQueue
LinkedTransferQueue — это неограниченная блокирующая очередь, нижний уровень которой реализован в виде связанного списка. Хотя он также реализуется с помощью связанного списка, как LinkedBlockingQueue, реализация управления параллелизмом сильно отличается. Подобно SynchronousQueue, он использует большое количество операций CAS и не использует блокировки. Поскольку он не ограничен, он не будет помещать производственный поток и не будет его блокировать.Поток-потребитель будет заблокирован только при взятии, а метод LockSupport.park будет использоваться и при приостановке потока-потребителя.
По сравнению с вышеупомянутыми очередями, LinkedTransferQueue также предоставляет некоторые дополнительные функции.Он реализует интерфейс TransferQueue и имеет два ключевых метода, методы Transfer(E e) и tryTransfer(E e).Передача будет блокироваться, когда нет потребления, а tryTransfer будет блокировать когда нет потребления.При потреблении он не будет вставлен в очередь, не будет ждать и сразу вернет false.
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
//通过SYNC状态来实现生产阻塞
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
//通过NOW状态跳过添加元素以及阻塞
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
//通过ASYNC状态跳过阻塞
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
//通过SYNC状态来实现消费阻塞
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
//生产消费调用同一个方法,通过e是否为空,haveData,how等参数来区分具体逻辑
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//找出第一个可用节点
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//队列为空时直接跳过
if (item != p && (item != null) == isData) { // unmatched
//节点类型相同,跳过
if (isData == haveData) // can not match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//插入节点或移除节点具体逻辑
//tryTransfer方法会直接跳过并返回结果
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData); //加入节点
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
//自旋以及阻塞消费线程逻辑,和SynchronousQueue类似,先尝试自选,失败后挂起线程
//transfer方法在没有消费线程时也会阻塞在这里
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
LinkedBlockingDeque
LinkedBlockingDeque – это ограниченная двусторонняя очередь. Нижний уровень реализован в виде двустороннего связанного списка. Реализация LinkedBlockingQeque Node имеет больше переменных prev, указывающих на предыдущий узел. Управление параллелизмом похоже на ArrayBlockingQueue. Для управления параллелизмом используется один ReentrantLock. Здесь, поскольку можно потреблять и создавать как начало, так и конец двусторонней очереди, используется общая блокировка. Он реализует интерфейс BlockingDeque, наследуется от интерфейса BlockingQueue и добавляет addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast и другие методы для производства и потребления начала и окончания. Код реализации LinkedBlockingDeque относительно прост, он в основном объединяет логику кода LinkedBlockingQeque и ArrayBlockingQueue, поэтому я не буду его здесь анализировать.
##Суммировать В этой статье представлен краткий анализ семи очередей блокировки в JDK1.8, и вы сможете примерно разобраться в основных принципах этих семи очередей. В общем, каждая блокирующая очередь имеет свои сценарии применения, при использовании ее можно выбирать по тому, ограниченная она или неограниченная, а затем по своим характеристикам.
Очереди с ограниченной блокировкой включают: ArrayBlockingQueue, LinkedBlockingQueue и LinkedBlockingDeque. LinkedBlockingDeque имеет несколько сценариев применения и обычно используется в режиме «кражи заданий». ArrayBlockingQueue и LinkedBlockingQueue в основном являются разницей между массивом и связанным списком. К неограниченным очередям относятся PriorityBlockingQueue, DelayQueue и LinkedTransferQueue. PriorityBlockingQueue используется в очередях, которые необходимо отсортировать. DelayQueue можно использовать для выполнения некоторых задач по времени или сценариев, когда срок действия кеша истекает. LinkedTransferQueue имеет больше функций передачи, чем другие очереди. Наконец, есть очередь SynchronousQueue, которая не хранит элементы и используется для обработки некоторых эффективных транзитивных сценариев.
Использованная литература:
- Искусство параллельного программирования на Java
- «Практика параллельного программирования на Java»