Ядром netty является поток-реактор, который соответствует широко используемому в проекте NioEventLoop, так что же делает NioEventLoop? Как netty обеспечивает эффективный опрос цикла событий и своевременное выполнение задач? Как изящно исправить ошибку nio jdk? С этими вопросами эта статья поможет вам понять правду о потоке реактора netty шаг за шагом [исходный код, основанный на 4.1.6.Final]
начало потока реактора
Метод run NioEventLoop — это основная часть потока-реактора, который запускается при первом добавлении задачи.
Метод выполнения родительского класса NioEventLoop SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
Внешний поток выполняется при добавлении задачи в очередь задачstartThread()
, netty оценит, был ли запущен поток-реактор, если нет, запустит поток и добавит задачи в очередь задач
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
SingleThreadEventExecutor выполняетсяdoStartThread
Когда вызывается внутренний исполнительexecutor
Метод execute инкапсулирует процесс вызова метода run NioEventLoop в runnable и подключает его к потоку для выполнения.
private void doStartThread() {
...
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
...
}
}
}
Нитьexecutor
Создайте соответствующий объект потока реактора netty.executor
По умолчаниюThreadPerTaskExecutor
по умолчанию,ThreadPerTaskExecutor
при каждом исполненииexecute
метод пройдетDefaultThreadFactory
СоздаватьFastThreadLocalThread
Поток, и этот поток является сущностью потока реактора в netty.
ThreadPerTaskExecutor
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
о том, почемуThreadPerTaskExecutor
иDefaultThreadFactory
комбинация для создания новогоFastThreadLocalThread
, который не будет здесь подробно описываться, но кратко поясняется с помощью следующих фрагментов кода
Стандартная программа netty вызовет
NioEventLoopGroup
родительский классMultithreadEventExecutorGroup
следующий код
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
}
Затем передайте его newChildNioEventLoop
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Давайте сначала поговорим о создании и запуске потоков-реакторов.Подведем итоги: поток-реактор netty создается при добавлении задачи, а сущность потокаFastThreadLocalThread
(Об этом мы поговорим далее в статье), а финальное тело выполнения потока —NioEventLoop
изrun
метод.
Исполнение реакторной нити
Итак, давайте сосредоточимся на анализеNioEventLoop
метод запуска
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
processSelectedKeys();
runAllTasks(...);
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
Извлекаем магистраль, то, что делает поток-реактор, на самом деле очень просто, что можно проиллюстрировать следующей картинкой
Поток реактора примерно разделен на три этапа для непрерывного цикла.
1. Сначала опросите события ввода-вывода всех каналов, зарегистрированных в селекторе, используемом парой потоков реактора.
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
2. Обработка каналов, которые генерируют сетевые события ввода-вывода
processSelectedKeys();
3. Обработайте очередь задач
runAllTasks(...);
Каждый шаг подробно описан ниже
выберите операцию
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
wakenUp
Указывает, следует ли пробуждать блокирующую операцию select.Видно, что перед новым циклом nettywakeUp
Он установлен в false, что отмечает начало нового цикла цикла.Мы также разделяем конкретную операцию выбора.
1. Приближается срок выполнения запланированной задачи, и этот опрос прерывается.
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
....
}
Мы можем видеть, что операция выбора потока реактора в NioEventLoop также является циклом for.На первом шаге цикла for, если обнаруживается, что в текущей очереди задач по времени есть событие крайнего срока выполнения задачи (if (selectCnt == 0)), затем позвоните один разselectNow()
, метод вернется немедленно и не будет блокироваться
Дело в том, что очередь задач по времени в netty сортируется по времени задержки от меньшего к большему.delayNanos(currentTimeNanos)
Метод заключается в том, чтобы убрать время задержки первой запланированной задачи.
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
Подробная информация об очереди задач netty (включая общие задачи, временные задачи, хвостовые задачи) будет рассмотрена в отдельной статье позже, но я не буду здесь слишком подробно останавливаться.
2. Обнаружено добавление задачи в процессе опроса, и опрос прерывается
for (;;) {
// 1.定时任务截至事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
....
}
Чтобы убедиться, что очередь задач может быть выполнена вовремя, netty определит, пуста ли очередь задач при выполнении блокирующей операции выбора.Если она не пуста, она выполнит неблокирующую операцию выбора и выпрыгнет из очереди. петля.
3. Блокировка операции выбора
for (;;) {
// 1.定时任务截至事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
...
// 3.阻塞式select操作
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
....
}
Этот шаг выполняется, указывая на то, что очередь в очереди задач netty пуста, а время задержки всех запланированных задач еще не наступило (более 0,5 мс), поэтому здесь выполняется блокирующая операция выбора до истечения крайнего срока первое запланированное задание.
Здесь мы можем задать себе вопрос, если задержка первой запланированной задачи очень велика, например, час, возможно ли, что поток был заблокирован в операции выбора, конечно, это возможно! Но, пока за это время присоединяется новая задача, блокировка снимается
Внешний поток вызывает метод execute для добавления задачи
@Override
public void execute(Runnable task) {
...
wakeup(inEventLoop); // inEventLoop为false
...
}
Вызовите метод пробуждения, чтобы разбудить блокировку селектора
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
Видно, что когда внешний поток добавляет задачу, для пробуждения будет вызываться метод wakeup.selector.select(timeoutMillis)
После того, как операция выбора блокировки завершена, netty делает ряд оценок состояния, чтобы решить, прерывать ли опрос.Условия для прерывания опроса:
- опрос событий ввода-вывода (
selectedKeys != 0
) - параметр oldWakeUp имеет значение true
- В очереди задач есть задачи (
hasTasks
) - Первая запланированная задача вот-вот будет выполнена (
hasScheduledTasks()
) - Пользователь активно просыпается (
wakenUp.get()
)
4. Устранить ошибку nio в jdk
Описание ошибки см.Не говорите .java.com/, но база данных...
Эта ошибка приведет к тому, что опрос Selector будет оставаться пустым, и в конечном итоге приведет к 100% -ному процессору и серверу nio, недоступному Строго говоря, netty не решает ошибку jdk, но ловко избегает этой ошибки каким-то образом. Конкретный метод выглядит следующим образом.
long currentTimeNanos = System.nanoTime();
for (;;) {
// 1.定时任务截止事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
...
// 3.阻塞式select操作
selector.select(timeoutMillis);
// 4.解决jdk的nio bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
...
}
netty будет работать каждый разselector.select(timeoutMillis)
Запишите время начала передcurrentTimeNanos
, запишите время окончания после выбора и оцените, продолжалась ли операция выбора по крайней мереtimeoutMillis
секунд (здесь будетtime - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
изменить наtime - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
Может лучше понять)
Если продолжительность больше или равна timeoutMillis, это означает, что это действительный опрос, сбросselectCnt
флаг, в противном случае это указывает на то, что метод блокировки не блокировался в течение такого длительного времени, что может вызвать ошибку пустого опроса jdk.Когда количество пустых опросов превышает пороговое значение, по умолчанию 512, и селектор будет перестроен .
Код настройки, связанный с пустым порогом опроса, выглядит следующим образом.
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
Ниже мы кратко опишем nettyrebuildSelector
Приходите исправлять пустые опросы процесса BUG,rebuildSelector
На самом деле операция очень проста: создайте новый селектор и повторно перенесите канал, ранее зарегистрированный на старом селекторе, на новый селектор. Скелет после извлечения основного кода выглядит следующим образом
public void rebuildSelector() {
final Selector oldSelector = selector;
final Selector newSelector;
newSelector = openSelector();
int nChannels = 0;
try {
for (;;) {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
}
break;
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
selector = newSelector;
oldSelector.close();
}
Во-первых, поopenSelector()
Метод создает новый селектор, а затем выполняет бесконечный цикл, пока в процессе выполнения происходит одновременная модификация исключения selectionKeys, передача перезапускается
Конкретные этапы передачи:
- получить действующий ключ
- Отменить регистрацию события ключа на старом селекторе
- Зарегистрируйте канал ключа на новый селектор
- Повторно свяжите связь между каналом и новым ключом
После завершения передачи исходный селектор может быть отброшен, и все последующие опросы выполняются на новом селекторе.
Наконец, мы резюмируем, что делает шаг выбора потока Reactor: постоянно опрашивает события ввода-вывода и постоянно проверяет, есть ли временные задачи и общие задачи во время процесса опроса, гарантируя, что задачи в очереди задач netty действительны. использует счетчик, чтобы избежать ошибки пустого опроса jdk, и процесс ясен
Из-за недостатка места следующие два процесса будут описаны в одной статье, пожалуйста, ждите.
process selected keys
Продолжение следует
run tasks
Продолжение следует
Наконец, благодаря картинке в начале статьи мы еще раз познакомились с тем, что делает поток-реактор netty.
- Опрос событий ввода-вывода
- Обработка событий опроса
- Выполнение задач в очереди задач
Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…