Если вы не знаете о потоке реактора netty, рекомендуется сначала прочитать следующую статьюАнализ исходного кода Netty, чтобы раскрыть завесу потоков реактора (1), вот фото трех ступеней реактора
Мы уже узнали, что первый шаг потока netty-реактора — это опрос событий ввода-вывода (выборки), зарегистрированных в селекторе, затем следующий шаг — обработка этих событий ввода-вывода (обработка выбранных ключей).В этой статье мы обсудим netty вместе Подробная информация об обработке событий ввода-вывода
Входим в поток реактораrun
метод, найдите код, обрабатывающий события ввода-вывода, следующим образом
processSelectedKeys();
следовать за
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
Мы обнаружили, что для обработки событий ввода-вывода у netty есть два варианта: с точки зрения имени: один — обрабатывать оптимизированные selectedKeys, а другой — обрабатывать в обычном режиме.
Давайте немного расширим обработку оптимизированных selectedKeys, чтобы увидеть, как оптимизирована netty, мы смотрим наselectedKeys
Там, где на него ссылались, есть следующий код
private SelectedSelectionKeySet selectedKeys;
private Selector NioEventLoop.openSelector() {
//...
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// selectorImplClass -> sun.nio.ch.SelectorImpl
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
//...
selectedKeys = selectedKeySet;
}
Во-первых, selectedKeys — этоSelectedSelectionKeySet
объект класса, вNioEventLoop
изopenSelector
метод, а затем используйте отражение, чтобы связать selectedKeys сsun.nio.ch.SelectorImpl
две привязки полей в
sun.nio.ch.SelectorImpl
Мы видим, что эти два поля на самом деле являются двумя HashSet.
// Public views of the key sets
private Set<SelectionKey> publicKeys; // Immutable
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
селектор звонитselect()
При использовании семейного метода, если происходит событие ввода-вывода, соответствующие поля будут вставлены в два поля внутри.selectionKey
(как его подключить нужно изучить), то есть это равносильно добавлению элементов в hashSet.Так как netty заменяет два поля в jdk через рефлексию, то надо понимать, кастомизирован ли он nettySelectedSelectionKeySet
существуетadd
Выполнил ли метод некоторую оптимизацию?
Имея в виду этот вопрос, мы входимSelectedSelectionKeySet
класс, чтобы узнать
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
keysB = keysA.clone();
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
private void doubleCapacityA() {
SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
keysA = newKeysA;
}
private void doubleCapacityB() {
SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
keysB = newKeysB;
}
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
@Override
public int size() {
if (isA) {
return keysASize;
} else {
return keysBSize;
}
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<SelectionKey> iterator() {
throw new UnsupportedOperationException();
}
}
Этот класс на самом деле очень простой, он наследуетAbstractSet
, что указывает на то, что класс можно использовать как набор, но нижний слой использует два массива для поочередного использования.add
В методе определите, какой массив используется в данный момент, найдите соответствующий массив, а затем выполните следующие три шага.
1. Запихните SelectionKey в логический конец массива
2. Обновить логическую длину массива +1
3. Если логическая длина массива равна физической длине массива, расширить массив
Мы можем видеть, что после того, как программа работает в течение определенного периода времени, а длина массива достаточно велика, каждый раз, когда опрашивается событие nio, netty требуется только временная сложность O (1), чтобыSelectionKey
Вставьте его в набор, а хэш-набор, используемый в нижней части jdk, требует O (lgn) временной сложности.
Вот почему два массива используются попеременно в цикле.На самом деле я тоже очень озадачен.Поразмыслив долго,я нашелSelectedSelectionKeySet
Во всех используемых местах я думаю, что использование массива может достичь целей оптимизации, и нет необходимости каждый раз судить, какой массив использовать, поэтому для этой проблемы я поднял вопрос официальному представителю netty, и официальный также дал ответ сказав, что он будет следить , выдать ссылку:GitHub.com/Нетти/Нетти…версия, у нетти естьSelectedSelectionKeySet.java
Нижний слой использует массив,Ссылка на сайт
О чистой пареSelectionKeySet
Оптимизация события ввода-вывода пока такова, давайте продолжим обработку netty событий ввода-вывода, перейдите кprocessSelectedKeysOptimized
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
// 1.取出IO事件以及对应的channel
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
// 2.处理该channel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 3.判断是否该再来次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
Мы можем разделить процесс на следующие три шага
1. Выньте событие ввода-вывода и соответствующий класс сетевого канала.
Здесь вы действительно можете испытать оптимизированныйSelectedSelectionKeySet
Преимущество обхода заключается в обходе массива, что является относительно родным для jdk.HashSet
Эффективность повысилась
После получения текущего SelectionKey установитеselectedKeys[i]
Если установлено значение null, вот краткое объяснение причины этого: представьте себе этот сценарий, предполагая, что NioEventLoop опрашивает в среднем N событий ввода-вывода каждый раз и опрашивает 3 события ввода-вывода в часы пик.N событий, тоselectedKeys
Физическая длина должна быть больше или равна 3N, если эти ключи обрабатываются каждый раз, не ставитьselectedKeys[i]
пуст, то после пикового периода они сохраняются в хвосте массиваselectedKeys[i]
соответствующийSelectionKey
никогда не будет переработан.SelectionKey
Соответствующий объект может быть не большим, но вы должны знать, что у него есть вложение.Что такое вложение здесь, будет рассмотрено ниже, но нам должно быть ясно, что вложение может быть очень большим.Таким образом, эти элементы являются GC root, легко привести к сбою gc и утечкам памяти.
Эта ошибка находится в4.0.19.Final
Версия исправлена, рекомендуется использовать проект netty для обновления до последней версии ^^
2. Обработка канала
Получив соответствующее вложение, нетти выносит следующее суждение
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
Прочитав это в исходниках, надо задуматься, почему такое суждение, и почему вложение может бытьAbstractNioChannel
объект?
Наша идея должна состоять в том, чтобы найти лежащий в основе селектор, а затем, когда селектор вызывает метод register, посмотреть, что, черт возьми, за объект, зарегистрированный в селекторе, мы используем ссылочную функцию глобального поиска intellij, и, наконец, вAbstractNioChannel
Следующий метод был найден в
protected void doRegister() throws Exception {
// ...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
// ...
}
javaChannel()
вернуть чистый классAbstractChannel
Соответствующий базовый объект канала jdk
protected SelectableChannel javaChannel() {
return ch;
}
Мы видим метод SelectableChannel в сочетании с nettydoRegister()
метод, мы можем легко сделать вывод, что механизм регистрации опроса netty на самом деле являетсяAbstractNioChannel
Внутренний класс jdkSelectableChannel
Объект зарегистрирован в классе jdkSelctor
объект и будетAbstractNioChannel
в видеSelectableChannel
Прилагается вложение объекта, так что jdk опрашивает определенный элементSelectableChannel
Когда происходит событие ввода-вывода, его можно вывести напрямуюAbstractNioChannel
следовать за
Ниже приведен метод регистрации в jdk.
//*
//* @param sel
//* The selector with which this channel is to be registered
//*
//* @param ops
//* The interest set for the resulting key
//*
//* @param att
//* The attachment for the resulting key; may be <tt>null</tt>
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
Из-за нехватки места подробноеprocessSelectedKey(SelectionKey k, AbstractNioChannel ch)
Мы напишем отдельную статью, чтобы подробно раскрыть процесс, вот краткое введение
1. Для босса NioEventLoop опрос — это в основном событие подключения, а последующие вещи передаются рабочему NioEventLoop через его конвейер для обработки.
2. Для воркера NioEventLoop опрос — это в основном события чтения и записи io, а последующая вещь — передать прочитанный поток байтов каждому обработчику каналов через его конвейер для обработки.
При работе с вложением выше также есть ветвь else, давайте проанализируем и ее. Код другой части выглядит следующим образом
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
Это означает, что вложение, зарегистрированное в селекторе, имеет другой тип, т.е.NioTask
, NioTask в основном используется, когдаSelectableChannel
При регистрации в селекторе выполнять некоторые задачи
Определение НиоТаска
public interface NioTask<C extends SelectableChannel> {
void channelReady(C ch, SelectionKey key) throws Exception;
void channelUnregistered(C ch, Throwable cause) throws Exception;
}
так какNioTask
Там, где он не используется внутри netty, я не буду его здесь раскрывать.
3. Определите, не пора ли провести повторный опрос
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
Напомним, что поток-реактор netty прошел первые два шага, а именно захват сгенерированных событий ввода-вывода и обработку событий ввода-вывода.После того, как каждое событие ввода-вывода будет захвачено, needToSelectAgain будет сброшен в false, поэтому когда будет сброшен needToSelectAgain Установлено ли оно?
Все та же идея, что и раньше, мы используем intellij, чтобы увидеть, где используется needToSelectAgain В классе NioEventLoop только следующий устанавливает для needToSelectAgain значение true.
NioEventLoop.java
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
продолжить просмотрcancel
где функция вызывается
AbstractChannel.java
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
Нетрудно заметить, что при снятии канала с селектора вызывается функция отмены для отмены ключа, а при приходе снятого ключаCLEANUP_INTERVAL
, установите для needToSelectAgain значение true,CLEANUP_INTERVAL
Значение по умолчанию – 256.
private static final int CLEANUP_INTERVAL = 256;
То есть, для каждого NioEventLoop, когда каждые 256 каналов удаляются из селектора, отметьте needToSelectAgain как true, мы все равно вернемся к приведенному выше коду.
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
Каждые 256 раз он будет входить в блок кода if.Сначала очистить весь внутренний массив selectedKeys, чтобы jvm мог собрать мусор, а затем вызвать его снова.selectAgain
пополнениеselectionKey
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
Я думаю, что цель netty - отключать канал каждые 256 раз, снова очищать selectionKey и своевременно обеспечивать, чтобы существующий SelectionKey был действительным.
На данный момент мы достаточно знаем о втором шаге реактора, когда впервые читаем исходный код. Подводя итог: вторым шагом потока реактора netty является обработка событий ввода-вывода. Netty использует массивы для замены собственного HashSet jdk, чтобы обеспечить эффективную обработку событий ввода-вывода. Каждый SelectionKey привязан к классу netty.AbstractChannel
Объект используется как вложение, которое можно найти при обработке каждого SelectionKey.AbstractChannel
, а затем сериализовать обработку в ChannelHandler через конвейер и вызвать пользовательский метод
В следующей статье мы рассмотрим последний шаг в потоке Reactor в netty,runTasks
, вы узнаете подробности о механизме асинхронного выполнения задач в netty, ждите с нетерпением.
Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…