1. Введение
Всего в Java существует 8 типов блокирующих очередей.
Мы проанализировали:
- Анализ исходного кода SynchronousQueue параллельного программирования
- Анализ исходного кода параллельного программирования ConcurrentLinkedQueue
- LinkedBolckingQueue анализ исходного кода параллельного программирования
- существуетОдновременное программирование - планируемоеthreadPoolExecutorКстати, DelayWorkQueue анализируется.
ArrayBlockingQueue
очередь массива, мы находимся вРеализуйте очередь блокировки, используя ReentrantLock и ConditionЯ видел пример, написанный JDK, который является основным принципом и реализацией этого класса. Хозяин не готов анализировать.
LinkedBlockingDeque
представляет собой очередь двусвязных списков. Часто используется в «алгоритмах кражи работы» с возможностью повторного анализа.
DelayQueue
是一个支持延时获取元素的无界阻塞队列。 Внутреннее использованиеPriorityQueue
выполнить.有机会再分析。
PriorityBlockingQueue
это неограниченная очередь блокировки, которая поддерживает приоритет, иDelayWorkQueue
похожий. есть возможность провести повторный анализ.
Сегодня я хочу проанализировать оставшуюся интересную очередь:LinkedTransferQueue
.
Почему это интересно? он может бытьLinkedBolckingQueue
иSynchronousQueue
и союз.
мы знаемSynchronousQueue
Элементы не могут быть сохранены внутри, когда элементы должны быть добавлены, они должны быть заблокированы, что не идеально.LinkedBolckingQueue
Внутри используется большое количество блокировок, а производительность невысокая.
Сочетание этих двух не идеально? Высокая производительность без блокировки.
Давайте посмотрим вместе.
2. Введение в LinkedTransferQueue
Этот класс реализует TransferQueue. Этот интерфейс определяет несколько методов:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果可能,立即将元素转移给等待的消费者。
// 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
boolean tryTransfer(E e);
// 将元素转移给消费者,如果需要的话等待。
// 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
void transfer(E e) throws InterruptedException;
// 上面方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 如果至少有一位消费者在等待,则返回 true
boolean hasWaitingConsumer();
// 返回等待消费者人数的估计值
int getWaitingConsumerCount();
}
По сравнению с обычной очередью блокировки добавлено несколько методов.
3. Ключевой анализ исходного кода
Блокирующая очередь — это не что иное, какput ,take,offer ,poll
и так далее плюсTransferQueue
несколькоtryTransfer
метод. Давайте посмотрим на реализацию этих методов.
put
метод:
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
take
метод:
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
offer
метод:
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
poll
метод:
public E poll() {
return xfer(null, false, NOW, 0);
}
tryTransfer
метод:
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
transfer
метод:
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
Ужасно, все методы указывают наxfer
метод, но с другими передаваемыми параметрами.
первый параметр, если онput
Тип является фактическим значением, в противном случае он равен нулю.
Второй параметр, содержит ли он данные, тип пут — истина, а тейк — ложь.
Третий параметр, тип выполнения, имеет немедленный возврат.NOW
, с асинхроннымASYNC
, с блокировкойSYNC
, с тайм-аутомTIMED
.
четвертый параметр, только еслиTIMED
Имеет значение только тип.
Итак, ключевым методом этого класса является метод xfer.
4. Анализ метода xfer
Исходный код с комментариями:
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 从 head 开始
for (Node h = head, p = h; p != null;) { // find & match first node
// head 的类型。
boolean isData = p.isData;
// head 的数据
Object item = p.item;
// item != null 有 2 种情况,一是 put 操作, 二是 take 的 itme 被修改了(匹配成功)
// (itme != null) == isData 要么表示 p 是一个 put 操作, 要么表示 p 是一个还没匹配成功的 take 操作
if (item != p && (item != null) == isData) {
// 如果当前操作和 head 操作相同,就没有匹配上,结束循环,进入下面的 if 块。
if (isData == haveData) // can't match
break;
// 如果操作不同,匹配成功, 尝试替换 item 成功,
if (p.casItem(item, e)) { // match
// 更新 head
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒原 head 线程.
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 找下一个
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 如果这个操作不是立刻就返回的类型
if (how != NOW) { // No matches available
// 且是第一次进入这里
if (s == null)
// 创建一个 node
s = new Node(e, haveData);
// 尝试将 node 追加对队列尾部,并返回他的上一个节点。
Node pred = tryAppend(s, haveData);
// 如果返回的是 null, 表示不能追加到 tail 节点,因为 tail 节点的模式和当前模式相反.
if (pred == null)
// 重来
continue retry; // lost race vs opposite mode
// 如果不是异步操作(即立刻返回结果)
if (how != ASYNC)
// 阻塞等待匹配值
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
Код немного длинный, но логика очень проста.
Логика следующая:
оказатьсяhead
узел, еслиhead
Если узел является совпадающей операцией, он присваивается напрямую, если нет, то добавляется в очередь.
Примечание. В очереди всегда имеется только один тип операции, либоput
тип, либоtake
тип.
Весь процесс выглядит следующим образом:
В сравненииSynchronousQueue
Существует еще одна очередь, которую можно сохранить, по сравнению сLinkedBlockingQueue
Более прямая передача элементов, меньшее использование блокировок для синхронизации.
Более высокая производительность и больше полезности.
5. Резюме
LinkedTransferQueue
даSynchronousQueue
иLinkedBlockingQueue
комбинация, коэффициент производительностиLinkedBlockingQueue
выше (без блокировки), чемSynchronousQueue
Можно сохранить больше элементов.
когдаput
Когда, если есть нить, ожидая на элементе непосредственно «к» официантам или непосредственно в очередь.
put
иtransfer
Разница между методами заключается в том, что put возвращает данные немедленно, тогда как Transfer блокирует и ждет, пока потребитель получит данные перед возвратом.transfer
Методы иSynchronousQueue
Метод put аналогичен.