Анализ исходного кода SynchronousQueue параллельного программирования

Node.js задняя часть алгоритм исходный код

предисловие

SynchronousQueueЭто очередь, которая обычно не используется обычными пользователями, обычно при создании неограниченного пула потоков (Executors.newCachedThreadPool()), который является очень опасным пулом потоков^_^.

Это особая очередь блокировки, режим которой: inoffer, если никакой другой поток не принимает илиpoll, то он потерпит неудачу, и наоборот, еслиtakeилиpollКогда нет ниткиoffer, это также не удастся, и эта функция очень подходит для пулов потоков с высоким откликом и незафиксированными потоками.Queue. Таким образом, на многих высокопроизводительных серверах, если параллелизм в настоящее время высок, обычныеLinkedQueueЭто станет узким местом, и будут сбои в работе.SynchronousQueueПосле этого производительность будет намного лучше.

Давайте посмотрим, как эта специальная очередь реализована сегодня. Дружеское напоминание: код немного сложен. . . Пожалуйста, будьте морально готовы.

реализация исходного кода

SynchronousQueue внутренне разделена на справедливую (очередь) и нечестную (стек), и производительность очереди относительно лучше. По способу построения это видно. По умолчанию это несправедливо, обычно производительность несправедливого (stack FIFO) будет немного выше.

Метод строительства:

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

метод предложения

Для этого метода мы обычно рекомендуем использовать механизм тайм-аута сofferметод.

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

Из приведенного выше кода видно, что основным методом являетсяtransferметод. если метод возвращаетtrue, указывая на то, что вставка прошла успешно, в случае неудачи возвращаетсяfalse.

метод опроса

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

Этот же метод также называетсяtransferметод. результат возвращает результирующее значение илиnull. Разница в том, чтоofferметодeПараметры являются сущностями. а такжеpollметодeпараметрnull, мы полагаем, что метод должен сделать суждение, основанное на этом внутренне. Итак, дело в том,transferреализация метода.

Трансферы бывают двух видов, очередь и стек, один из них мы изучим и узнаем его принцип, а другой успеем его посмотреть.

Реализация исходного кода TransferQueue

Метод строительства:

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

Создает узел Node с комментарием, что это добавленный узел. И назначьте его головному и хвостовому узлам. Сформируйте инициализированный связанный список.

Взгляните на этот узел:

/** Node class for TransferQueue. */
static final class QNode {
    volatile QNode next;          // next node in queue
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // to control park/unpark
    final boolean isData;
}

Узел содержит следующий узел в очереди, значение, соответствующее узлу, поток, удерживающий узел, и парковку или непарковку.Здесь используется инструментальный класс JUC LockSupport, и есть логический тип isData, который очень Важно и должно быть Поймите это хорошо, и мы объясним это позже.

Нас больше беспокоит метод передачи этого класса, который является ядром SynchronousQueue.

Интерфейс метода определяется следующим образом:

/**
 * Performs a put or take. put 或者 take
 *
 * @param e if non-null, the item to be handed to a consumer;
 *          if null, requests that transfer return an item
 *          offered by producer. 
 * @param timed if this operation should timeout
 * @param nanos the timeout, in nanoseconds
 * @return if non-null, the item provided or received; if null,
 *         the operation failed due to timeout or interrupt --
 *         the caller can distinguish which of these occurred
 *         by checking Thread.interrupted.
 */
abstract E transfer(E e, boolean timed, long nanos);

В комментарии говорится, что делает параметр e:

Если e не равен нулю (указывает, что он вызывается производителем), передать элемент потребителю и вернуть e; в противном случае, если он равен нулю (указывая, что он вызывается потребителем), вернуть элемент, предоставленный производителем потребителю.

Глядя на реализацию метода передачи класса TransferQueue, арендодатель написал много комментариев, чтобы попытаться интерпретировать:

QNode s = null; // constructed/reused as needed
boolean isData = (e != null);// 当输入的是数据时,isData 就是 ture,表明这个操作是一个输入数据的操作;同理,当调用者输入的是 null,则是在消费数据。

for (;;) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null)         // 如果并发导致未"来得及"初始化
        continue;                       // 自旋重来

    // 以下分成两个部分进行

    // 1. 如果当前操作和 tail 节点的操作是一样的;或者头尾相同(表明队列中啥都没有)。
    if (h == t || t.isData == isData) { 
        QNode tn = t.next;
        if (t != tail)                  // 如果 t 和 tail 不一样,说明,tail 被其他的线程改了,重来
            continue;
        if (tn != null) {               // 如果 tail 的 next 不是空。就需要将 next 追加到 tail 后面了。
            advanceTail(t, tn); // 使用 CAS 将 tail.next 变成 tail,        
            continue;
        }
        if (timed && nanos <= 0)        // 时间到了,不等待,返回 null,插入失败,获取也是失败的。
            return null;
        if (s == null) // 如果能走到这里,说明 tail 的 next 是 null,这里的判断是避免重复创建 Qnode 对象。
            s = new QNode(e, isData);// 创建一个新的节点。
        if (!t.casNext(null, s))        // 尝试 CAS 将这个刚刚创建的节点追加到 tail 的 next 节点上.
            continue;// 如果失败,则重来

        advanceTail(t, s); // 当新的节点成功追加到 tail 节点的 next 上了, 就尝试将 tail.next 节点覆盖 tail 节点,称之为推进。
        // s == 新节点,“可能”是新的 tail;e 是实际数据。
        Object x = awaitFulfill(s, e, timed, nanos);// 该方法作用就是,让当前线程等待。排除意外情况和超时的话,就是等待其他线程拿走数据并替换成 isData 不同的数据。
        if (x == s) { // x == s 是什么意思呢? 表明在 awaitFulfill 方法中,这个数据被取消了,tryCancel 方法就是将 item 覆盖了 QNode。说明这次操作失败了。
            clean(t, s);// 操作失败则需要清理数据,并返回 null。
            return null;
        }

        // 如果一切顺利,确实被其他线程唤醒了,其他线程也交换了数据。
        // 这个判断:next != this,说明了什么?当这个 tail 节点的 next 不再指向自己,说明了
        if (!s.isOffList()) {           // not already unlinked
            // 这一步是将 S 节点设置为 Head,并且将新 Head 的 next 指向自己,让 Head 和之前的 next 断开。
            advanceHead(t, s);          // unlink if head     
            // 当 x 不是 null,表明对方线程是存放数据的。     
            if (x != null)              // and forget fields
                // 这一步操作将自己的 item 设置成自己。
                s.item = s;
            // 将 S 节点的持有线程变成 null。
            s.waiter = null;
        }
        // x 不是 null 表明,对方线程是生产者,返回他生产的数据;如果是 null,说明对方线程是消费者,那他自己就是生产者,返回自己的数据,表示成功。
        return (x != null) ? (E)x : e;

    } 
    // 2. 如果当前的操作类型和 tail 的操作不一样。称之为互补。
    else {                            // complementary-mode
        QNode m = h.next;               // node to fulfill
        // 如果下方这些判断没过,说明并发修改了,自旋重来。 
        if (t != tail || m == null || h != head)
            continue;                   // inconsistent read

        Object x = m.item;
        // 如果 head 节点的 isData 和当前操作相同,
        // 如果 操作不同,但 head 的 item 就是自身,也就是发生了取消操作,tryCancel 方法会做这件事情。
        // 如果上面2个都不满足,尝试使用 CAS 将 e 覆盖 item。 
        if (isData == (x != null) ||    // m already fulfilled
            x == m ||                   // m cancelled
            !m.casItem(x, e)) {         // lost CAS
            // CAS 失败了,Head 的操作类型和当前类型相同,item 被取消了,都会走这里。
            // 将 h.next 覆盖 head。重来。
            advanceHead(h, m);          // dequeue and retry
            continue;
        }
        // 这里也是将 h.next 覆盖 head。能够走到这里,说明,上面的 CAS 操作成功了,当前线程已经将 e 覆盖了 next 的 item 。
        advanceHead(h, m);              // successfully fulfilled
        // 唤醒 next 的 线程。提醒他可以取出数据,或者“我”已经拿到数据了。
        LockSupport.unpark(m.waiter);
        // 如果 x 不是 null,表明这是一次消费数据的操作,反之,这是一次生产数据的操作。
        return (x != null) ? (E)x : e;
    }
}

Честно говоря, код все еще более сложный. Комментарии в JDK говорят это:

Основной алгоритм является бесконечной петлей одним из двух способов. 1 Если очередь пуста или удерживает один и тот же модный узел (isDataто же самое), попробуйте добавить узел в очередь и дождаться ожидания текущего потока. 2 Если в очереди есть потоки, ожидающие互补Кстати, используйте CAS для обмена данными с официантом. и вернуться.

Что это значит?

Прежде всего, дайте понять, что в очереди есть 2 кейса данных (но одновременно существует только один), илиQNodeесть реальные данные(offer, данные есть, но нет "людей", которые могли бы их получить), или фактических данных нет (pollКогда в очереди нет данных, поток должен ждать). В каком состоянии находится очередь, зависит от他为空后,第一个插入的是什么类型的数据.

Владелец нарисовал картинку, чтобы показать:

  1. При инициализации очереди имеется только один пустойNode.

image.png

  1. В этот момент поток пытаетсяofferилиpollданные, вставитNodeвставляется в узел.

image.png

  1. Предполагая, что операция offer только что произошла, в это время на предложение также приходит другой поток, тогда будет 2 узла.

image.png

  1. В это время в очереди есть 2 узла с реальными данными (операция предложения). Обратите внимание, что в это время эти 2 потока обаwait, потому что никто не принимает их данные. В этот момент для выполнения операции опроса приходит другой поток.

image.png

Как видно из приведенного выше рисунка,pollнить изheadначать извлечение данных, потому что этоisDataа такжеtailЕсли isData узла отличается, он начнет поиск узла с головы и попытается обменять собственное нулевое значение на реальные данные в узле. И разбудить ожидающий поток.

Эти 4 картинкиSynchronousQueueсущность.

Так как она называется синхронной очередью, она должна быть, когда поток A производит данные, а поток B потребляет, иначе поток A должен ждать.Наоборот, если поток A готов потреблять данные, но данных нет в очередь, поток также будет ждать, пока не появится B. Потоки хранят данные.

Принцип реализации JDK: использовать очередь и использовать очередь в очереди.isDataЧтобы различать производство и потребление, все новые операции основаны на режиме хвостового узла, чтобы определить, следует ли добавлять кtailузел илиtailузел (изheadstart) для обмена данными.

И так называемый обмен отheadДля начала получите фактические данные узла, затем используйтеCASОбмен с соответствующим узлом. Таким образом, операция прямого обмена данными между двумя потоками завершена.

Почему он при определенных обстоятельствахLinkedBlockingQueueА как насчет высокой производительности? Одна из причин заключается в том, что блокировки не используются, что уменьшает переключение контекста потока. Во-вторых, способ обмена данными между потоками более эффективен.

Хорошо, важная часть позади, давайте посмотрим, как ждет поток. логика вawaitFulfillВ методе:

// 自旋或者等待,直到填充完毕
// 这里的策略是什么呢?如果自旋次数不够了,通常是 16 次,但还有超过 1 秒的时间,就阻塞等待被唤醒。
// 如果时间到了,就取消这次的入队行为。
// 返回的是 Node 本身
// s.item 就是 e 
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?// 如果成功将 tail.next 覆盖了 tail,如果有超时机制,则自旋 32 次,如果没有超时机制,则自旋 32 *16 = 512次
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())// 当前线程被中断
            s.tryCancel(e);// 尝试取消这个 item
        Object x = s.item;// 获取到这个 tail 的 item
        if (x != e) // 如果不相等,说明 node 中的 item 取消了,返回这个 item。
            // 这里是唯一停止循环的地方。当 s.item 已经不是当初的哪个 e 了,说明要么是时间到了被取消了,要么是线程中断被取消了。
            // 当然,不仅仅只有这2种 “意外” 情况,还有一种情况是:当另一个线程拿走了这个数据,并修改了 item,也会通过这个判断,返回被“修改”过的 item。
            return x;
        if (timed) {// 如果有时间限制
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {// 如果时间到了
                s.tryCancel(e);// 尝试取消 item,供上面的 x != e 判断
                continue;// 重来
            }
        }
        if (spins > 0)// 如果还有自旋次数
            --spins;// 减一
        else if (s.waiter == null)// 如果自旋不够,且 tail 的等待线程还没有赋值
            s.waiter = w;// 当前线程赋值给 tail 的等待线程
        else if (!timed)// 如果自旋不够,且如果线程赋值过了,且没有限制时间,则 wait,(危险操作)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)// 如果自旋不够,且如果限制了时间,且时间还剩余超过 1 秒,则 wait 剩余时间。
            // 主要目的就是等待,等待其他线程唤醒这个节点所在的线程。
            LockSupport.parkNanos(this, nanos);
    }
}

Логика метода следующая:

  1. Прокрутка по умолчанию составляет 32 раза или 512 раз, если нет механизма тайм-аута.
  2. Если время истекло или поток прерван, отмените эту операцию,itemНастройтесь на себя. для последующего суда.
  3. Если вращение закончилось и осталось более 1 секунды, заблокируйте и дождитесь оставшегося времени.
  4. Когда поток пробуждается другим потоком, происходит обмен данными. ноreturn, который возвращает обменные данные.

Суммировать

Ну, продолжаетсяSynchronousQueueАнализ основных исходных кодов здесь, арендодатель не проанализировал весь исходный код для этого класса, только для изучения основной части кода, которые достаточно, чтобы мы поняли этоQueueреализуется внутри.

Подвести итог:

JDK использует очереди или стеки для реализации справедливых или нечестных моделей. в,isDataАтрибуты чрезвычайно важны.Операция, которая идентифицирует этот поток, определяет, должен ли он добавляться в очередь или обмениваться данными из очереди.

Когда каждый поток не сталкивается со своей второй половиной, он либо быстро выходит из строя, либо блокируется, блокируя и ожидая прихода своей второй половины.Что касается того, дает ли другая сторона данные или извлекает данные, это зависит от нее самой.Если она потребитель, тогда Он является производителем.

удачи! ! ! !