Анализ исходного кода Netty раскрывает завесу потока реактора (2)

Java задняя часть исходный код Netty

Если вы не знаете о потоке реактора netty, рекомендуется сначала прочитать следующую статьюАнализ исходного кода Netty, чтобы раскрыть завесу потоков реактора (1), вот фото трех ступеней реактора

reactor线程

Мы уже узнали, что первый шаг потока netty-реактора — это опрос событий ввода-вывода (выборки), зарегистрированных в селекторе, затем следующий шаг — обработка этих событий ввода-вывода (обработка выбранных ключей).В этой статье мы обсудим netty вместе Подробная информация об обработке событий ввода-вывода

Входим в поток реактораrunметод, найдите код, обрабатывающий события ввода-вывода, следующим образом

processSelectedKeys();

следовать за

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

Мы обнаружили, что для обработки событий ввода-вывода у netty есть два варианта: с точки зрения имени: один — обрабатывать оптимизированные selectedKeys, а другой — обрабатывать в обычном режиме.

Давайте немного расширим обработку оптимизированных selectedKeys, чтобы увидеть, как оптимизирована netty, мы смотрим наselectedKeysТам, где на него ссылались, есть следующий код

private SelectedSelectionKeySet selectedKeys;

private Selector NioEventLoop.openSelector() {
    //...
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    // selectorImplClass -> sun.nio.ch.SelectorImpl
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    selectedKeysField.setAccessible(true);
    publicSelectedKeysField.setAccessible(true);
    selectedKeysField.set(selector, selectedKeySet);
    publicSelectedKeysField.set(selector, selectedKeySet);
    //...
    selectedKeys = selectedKeySet;
}

Во-первых, selectedKeys — этоSelectedSelectionKeySetобъект класса, вNioEventLoopизopenSelectorметод, а затем используйте отражение, чтобы связать selectedKeys сsun.nio.ch.SelectorImplдве привязки полей в

sun.nio.ch.SelectorImplМы видим, что эти два поля на самом деле являются двумя HashSet.

// Public views of the key sets
private Set<SelectionKey> publicKeys;             // Immutable
private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = new HashSet<SelectionKey>();
    selectedKeys = new HashSet<SelectionKey>();
    if (Util.atBugLevel("1.4")) {
        publicKeys = keys;
        publicSelectedKeys = selectedKeys;
    } else {
        publicKeys = Collections.unmodifiableSet(keys);
        publicSelectedKeys = Util.ungrowableSet(selectedKeys);
    }
}

селектор звонитselect()При использовании семейного метода, если происходит событие ввода-вывода, соответствующие поля будут вставлены в два поля внутри.selectionKey(как его подключить нужно изучить), то есть это равносильно добавлению элементов в hashSet.Так как netty заменяет два поля в jdk через рефлексию, то надо понимать, кастомизирован ли он nettySelectedSelectionKeySetсуществуетaddВыполнил ли метод некоторую оптимизацию?

Имея в виду этот вопрос, мы входимSelectedSelectionKeySetкласс, чтобы узнать

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }

        return true;
    }

    private void doubleCapacityA() {
        SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
        System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
        keysA = newKeysA;
    }

    private void doubleCapacityB() {
        SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
        System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
        keysB = newKeysB;
    }

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }

    @Override
    public int size() {
        if (isA) {
            return keysASize;
        } else {
            return keysBSize;
        }
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }
}

Этот класс на самом деле очень простой, он наследуетAbstractSet, что указывает на то, что класс можно использовать как набор, но нижний слой использует два массива для поочередного использования.addВ методе определите, какой массив используется в данный момент, найдите соответствующий массив, а затем выполните следующие три шага. 1. Запихните SelectionKey в логический конец массива 2. Обновить логическую длину массива +1 3. Если логическая длина массива равна физической длине массива, расширить массив

Мы можем видеть, что после того, как программа работает в течение определенного периода времени, а длина массива достаточно велика, каждый раз, когда опрашивается событие nio, netty требуется только временная сложность O (1), чтобыSelectionKeyВставьте его в набор, а хэш-набор, используемый в нижней части jdk, требует O (lgn) временной сложности.

Вот почему два массива используются попеременно в цикле.На самом деле я тоже очень озадачен.Поразмыслив долго,я нашелSelectedSelectionKeySetВо всех используемых местах я думаю, что использование массива может достичь целей оптимизации, и нет необходимости каждый раз судить, какой массив использовать, поэтому для этой проблемы я поднял вопрос официальному представителю netty, и официальный также дал ответ сказав, что он будет следить , выдать ссылку:GitHub.com/Нетти/Нетти…версия, у нетти естьSelectedSelectionKeySet.javaНижний слой использует массив,Ссылка на сайт

О чистой пареSelectionKeySetОптимизация события ввода-вывода пока такова, давайте продолжим обработку netty событий ввода-вывода, перейдите кprocessSelectedKeysOptimized

 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
     for (int i = 0;; i ++) {
         // 1.取出IO事件以及对应的channel
         final SelectionKey k = selectedKeys[i];
         if (k == null) {
             break;
         }
         selectedKeys[i] = null;
         final Object a = k.attachment();
         // 2.处理该channel
         if (a instanceof AbstractNioChannel) {
             processSelectedKey(k, (AbstractNioChannel) a);
         } else {
             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
             processSelectedKey(k, task);
         }
         // 3.判断是否该再来次轮询
         if (needsToSelectAgain) {
             for (;;) {
                 i++;
                 if (selectedKeys[i] == null) {
                     break;
                 }
                 selectedKeys[i] = null;
             }
             selectAgain();
             selectedKeys = this.selectedKeys.flip();
             i = -1;
         }
     }
 }

Мы можем разделить процесс на следующие три шага

1. Выньте событие ввода-вывода и соответствующий класс сетевого канала.

Здесь вы действительно можете испытать оптимизированныйSelectedSelectionKeySetПреимущество обхода заключается в обходе массива, что является относительно родным для jdk.HashSetЭффективность повысилась

После получения текущего SelectionKey установитеselectedKeys[i]Если установлено значение null, вот краткое объяснение причины этого: представьте себе этот сценарий, предполагая, что NioEventLoop опрашивает в среднем N событий ввода-вывода каждый раз и опрашивает 3 события ввода-вывода в часы пик.N событий, тоselectedKeysФизическая длина должна быть больше или равна 3N, если эти ключи обрабатываются каждый раз, не ставитьselectedKeys[i]пуст, то после пикового периода они сохраняются в хвосте массиваselectedKeys[i]соответствующийSelectionKeyникогда не будет переработан.SelectionKeyСоответствующий объект может быть не большим, но вы должны знать, что у него есть вложение.Что такое вложение здесь, будет рассмотрено ниже, но нам должно быть ясно, что вложение может быть очень большим.Таким образом, эти элементы являются GC root, легко привести к сбою gc и утечкам памяти.

Эта ошибка находится в4.0.19.FinalВерсия исправлена, рекомендуется использовать проект netty для обновления до последней версии ^^

2. Обработка канала

Получив соответствующее вложение, нетти выносит следующее суждение

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} 

Прочитав это в исходниках, надо задуматься, почему такое суждение, и почему вложение может бытьAbstractNioChannelобъект?

Наша идея должна состоять в том, чтобы найти лежащий в основе селектор, а затем, когда селектор вызывает метод register, посмотреть, что, черт возьми, за объект, зарегистрированный в селекторе, мы используем ссылочную функцию глобального поиска intellij, и, наконец, вAbstractNioChannelСледующий метод был найден в

protected void doRegister() throws Exception {
    // ...
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    // ...
}

javaChannel()вернуть чистый классAbstractChannelСоответствующий базовый объект канала jdk

protected SelectableChannel javaChannel() {
    return ch;
}

Мы видим метод SelectableChannel в сочетании с nettydoRegister()метод, мы можем легко сделать вывод, что механизм регистрации опроса netty на самом деле являетсяAbstractNioChannelВнутренний класс jdkSelectableChannelОбъект зарегистрирован в классе jdkSelctorобъект и будетAbstractNioChannelв видеSelectableChannelПрилагается вложение объекта, так что jdk опрашивает определенный элементSelectableChannelКогда происходит событие ввода-вывода, его можно вывести напрямуюAbstractNioChannelследовать за

Ниже приведен метод регистрации в jdk.

     //*
     //* @param  sel
     //*         The selector with which this channel is to be registered
     //*
     //* @param  ops
     //*         The interest set for the resulting key
     //*
     //* @param  att
     //*         The attachment for the resulting key; may be <tt>null</tt>
public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;

Из-за нехватки места подробноеprocessSelectedKey(SelectionKey k, AbstractNioChannel ch)Мы напишем отдельную статью, чтобы подробно раскрыть процесс, вот краткое введение 1. Для босса NioEventLoop опрос — это в основном событие подключения, а последующие вещи передаются рабочему NioEventLoop через его конвейер для обработки. 2. Для воркера NioEventLoop опрос — это в основном события чтения и записи io, а последующая вещь — передать прочитанный поток байтов каждому обработчику каналов через его конвейер для обработки.

При работе с вложением выше также есть ветвь else, давайте проанализируем и ее. Код другой части выглядит следующим образом

NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);

Это означает, что вложение, зарегистрированное в селекторе, имеет другой тип, т.е.NioTask, NioTask в основном используется, когдаSelectableChannelПри регистрации в селекторе выполнять некоторые задачи

Определение НиоТаска

public interface NioTask<C extends SelectableChannel> {
    void channelReady(C ch, SelectionKey key) throws Exception;
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}

так какNioTaskТам, где он не используется внутри netty, я не буду его здесь раскрывать.

3. Определите, не пора ли провести повторный опрос

if (needsToSelectAgain) {
    for (;;) {
        i++;
        if (selectedKeys[i] == null) {
            break;
        }
        selectedKeys[i] = null;
    }
    selectAgain();
    selectedKeys = this.selectedKeys.flip();
    i = -1;
}

Напомним, что поток-реактор netty прошел первые два шага, а именно захват сгенерированных событий ввода-вывода и обработку событий ввода-вывода.После того, как каждое событие ввода-вывода будет захвачено, needToSelectAgain будет сброшен в false, поэтому когда будет сброшен needToSelectAgain Установлено ли оно?

Все та же идея, что и раньше, мы используем intellij, чтобы увидеть, где используется needToSelectAgain В классе NioEventLoop только следующий устанавливает для needToSelectAgain значение true.

NioEventLoop.java

void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

продолжить просмотрcancelгде функция вызывается

AbstractChannel.java

@Override
protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}

Нетрудно заметить, что при снятии канала с селектора вызывается функция отмены для отмены ключа, а при приходе снятого ключаCLEANUP_INTERVAL, установите для needToSelectAgain значение true,CLEANUP_INTERVALЗначение по умолчанию – 256.

private static final int CLEANUP_INTERVAL = 256;

То есть, для каждого NioEventLoop, когда каждые 256 каналов удаляются из селектора, отметьте needToSelectAgain как true, мы все равно вернемся к приведенному выше коду.

if (needsToSelectAgain) {
    for (;;) {
        i++;
        if (selectedKeys[i] == null) {
            break;
        }
        selectedKeys[i] = null;
    }
    selectAgain();
    selectedKeys = this.selectedKeys.flip();
    i = -1;
}

Каждые 256 раз он будет входить в блок кода if.Сначала очистить весь внутренний массив selectedKeys, чтобы jvm мог собрать мусор, а затем вызвать его снова.selectAgainпополнениеselectionKey

private void selectAgain() {
    needsToSelectAgain = false;
    try {
        selector.selectNow();
    } catch (Throwable t) {
        logger.warn("Failed to update SelectionKeys.", t);
    }
}

Я думаю, что цель netty - отключать канал каждые 256 раз, снова очищать selectionKey и своевременно обеспечивать, чтобы существующий SelectionKey был действительным.

На данный момент мы достаточно знаем о втором шаге реактора, когда впервые читаем исходный код. Подводя итог: вторым шагом потока реактора netty является обработка событий ввода-вывода. Netty использует массивы для замены собственного HashSet jdk, чтобы обеспечить эффективную обработку событий ввода-вывода. Каждый SelectionKey привязан к классу netty.AbstractChannelОбъект используется как вложение, которое можно найти при обработке каждого SelectionKey.AbstractChannel, а затем сериализовать обработку в ChannelHandler через конвейер и вызвать пользовательский метод

В следующей статье мы рассмотрим последний шаг в потоке Reactor в netty,runTasks, вы узнаете подробности о механизме асинхронного выполнения задач в netty, ждите с нетерпением.

Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…