Однократное программирование - LinkedTransferceue

Java задняя часть алгоритм

1. Введение

Всего в Java существует 8 типов блокирующих очередей.

Мы проанализировали:

ArrayBlockingQueueочередь массива, мы находимся вРеализуйте очередь блокировки, используя ReentrantLock и ConditionЯ видел пример, написанный JDK, который является основным принципом и реализацией этого класса. Хозяин не готов анализировать.

LinkedBlockingDequeпредставляет собой очередь двусвязных списков. Часто используется в «алгоритмах кражи работы» с возможностью повторного анализа.

DelayQueue是一个支持延时获取元素的无界阻塞队列。 Внутреннее использованиеPriorityQueueвыполнить.有机会再分析。

PriorityBlockingQueueэто неограниченная очередь блокировки, которая поддерживает приоритет, иDelayWorkQueueпохожий. есть возможность провести повторный анализ.

Сегодня я хочу проанализировать оставшуюся интересную очередь:LinkedTransferQueue.

Почему это интересно? он может бытьLinkedBolckingQueueиSynchronousQueueи союз.

мы знаемSynchronousQueueЭлементы не могут быть сохранены внутри, когда элементы должны быть добавлены, они должны быть заблокированы, что не идеально.LinkedBolckingQueueВнутри используется большое количество блокировок, а производительность невысокая.

Сочетание этих двух не идеально? Высокая производительность без блокировки.

Давайте посмотрим вместе.

2. Введение в LinkedTransferQueue

image.png

Этот класс реализует TransferQueue. Этот интерфейс определяет несколько методов:

public interface TransferQueue<E> extends BlockingQueue<E> {
    // 如果可能,立即将元素转移给等待的消费者。 
    // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
    boolean tryTransfer(E e);

    // 将元素转移给消费者,如果需要的话等待。 
    // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
    void transfer(E e) throws InterruptedException;

    // 上面方法的基础上设置超时时间
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // 如果至少有一位消费者在等待,则返回 true
    boolean hasWaitingConsumer();

    // 返回等待消费者人数的估计值
    int getWaitingConsumerCount();
}

По сравнению с обычной очередью блокировки добавлено несколько методов.

3. Ключевой анализ исходного кода

Блокирующая очередь — это не что иное, какput ,take,offer ,pollи так далее плюсTransferQueueнесколькоtryTransferметод. Давайте посмотрим на реализацию этих методов.

putметод:

public void put(E e) {
     xfer(e, true, ASYNC, 0);
}

takeметод:

public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

offerметод:

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

pollметод:

public E poll() {
    return xfer(null, false, NOW, 0);
}

tryTransferметод:

public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}

transferметод:

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

Ужасно, все методы указывают наxferметод, но с другими передаваемыми параметрами.

первый параметр, если онputТип является фактическим значением, в противном случае он равен нулю. Второй параметр, содержит ли он данные, тип пут — истина, а тейк — ложь. Третий параметр, тип выполнения, имеет немедленный возврат.NOW, с асинхроннымASYNC, с блокировкойSYNC, с тайм-аутомTIMED. четвертый параметр, только еслиTIMEDИмеет значение только тип.

Итак, ключевым методом этого класса является метод xfer.

4. Анализ метода xfer

Исходный код с комментариями:

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // 从  head 开始
        for (Node h = head, p = h; p != null;) { // find & match first node
            // head 的类型。
            boolean isData = p.isData;
            // head 的数据
            Object item = p.item;
            // item != null 有 2 种情况,一是 put 操作, 二是 take 的 itme 被修改了(匹配成功)
            // (itme != null) == isData 要么表示 p 是一个 put 操作, 要么表示 p 是一个还没匹配成功的 take 操作
            if (item != p && (item != null) == isData) { 
                // 如果当前操作和 head 操作相同,就没有匹配上,结束循环,进入下面的 if 块。
                if (isData == haveData)   // can't match
                    break;
                // 如果操作不同,匹配成功, 尝试替换 item 成功,
                if (p.casItem(item, e)) { // match
                    // 更新 head
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 唤醒原 head 线程.
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // 找下一个
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        // 如果这个操作不是立刻就返回的类型    
        if (how != NOW) {                 // No matches available
            // 且是第一次进入这里
            if (s == null)
                // 创建一个 node
                s = new Node(e, haveData);
            // 尝试将 node 追加对队列尾部,并返回他的上一个节点。
            Node pred = tryAppend(s, haveData);
            // 如果返回的是 null, 表示不能追加到 tail 节点,因为 tail 节点的模式和当前模式相反.
            if (pred == null)
                // 重来
                continue retry;           // lost race vs opposite mode
            // 如果不是异步操作(即立刻返回结果)
            if (how != ASYNC)
                // 阻塞等待匹配值
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

Код немного длинный, но логика очень проста.

Логика следующая: оказатьсяheadузел, еслиheadЕсли узел является совпадающей операцией, он присваивается напрямую, если нет, то добавляется в очередь.

Примечание. В очереди всегда имеется только один тип операции, либоputтип, либоtakeтип.

Весь процесс выглядит следующим образом:

image.png

В сравненииSynchronousQueueСуществует еще одна очередь, которую можно сохранить, по сравнению сLinkedBlockingQueueБолее прямая передача элементов, меньшее использование блокировок для синхронизации.

Более высокая производительность и больше полезности.

5. Резюме

LinkedTransferQueueдаSynchronousQueueиLinkedBlockingQueueкомбинация, коэффициент производительностиLinkedBlockingQueueвыше (без блокировки), чемSynchronousQueueМожно сохранить больше элементов.

когдаputКогда, если есть нить, ожидая на элементе непосредственно «к» официантам или непосредственно в очередь.

putиtransferРазница между методами заключается в том, что put возвращает данные немедленно, тогда как Transfer блокирует и ждет, пока потребитель получит данные перед возвратом.transferМетоды иSynchronousQueueМетод put аналогичен.