Обзор
Примечание. NIO2 (AIO) — это асинхронный ввод-вывод.
Введение в NIO2
AIO
иNIO2
По сути, это одно и то же, как и Сунь Синчжэ и Чжэ Синсун на самом деле Солнечные Обезьяны, но названия разные, а суть одна
Итак, как понимать это понятие? Например
Предположим, у девушки много лизающих собак (SpareTire), если девушка хочет чего-то добиться, какой самый простой и эффективный способ?
Ответ заключается в том, что собак, которые лижут друг друга, очень много, и они могут это делать. Итак, пока собака выполняет поручения, будет ли девочка ждать, пока собака все сделает? Нет, в этот период вы, конечно, можете продолжать раздавать другие задания другим собакам. Когда собака выполняет поручения, если есть что-то, с чем должна справиться девочка, просто предупредите девушку, чтобы она это сделала.
Конечно, собаки обычно выполняют тяжелую работу, такую как копирование данных, ввод-вывод, получение новых соединений и т. д. (очень жаль). Девушки сосредоточены на основном бизнесе.
В этом примере девочки эквивалентны основному бизнес-потоку, который в основном используется для обработки бизнес-логики, а собаки являются абстракцией (ядро + поток ввода-вывода).
P.S.
-
Если вы знаете NIO2, рекомендуется прочитать главу «Интерпретация модели NIO2» напрямую, вам не нужно читать главу «Демонстрация NIO2» (время дорого)
-
Вы можете сразу переходить ко всем главамСуммировать, или просто читатьприложениеНачать отладку кода напрямую
NIO2 DEMO
В NIO2 есть ключевой момент, то есть ядро в основном отвечает за уведомление программы о каких-либо событиях, а прием соединений и копирование данных по-прежнему требует от программы предоставления потоков для выполнения этих действий. девушка (основной бизнес-поток) должна обеспечивать лизание собак Пул (пул потоков) дает ядро для выполнения этих задач
talk is cheap, show me your hair
Если вы хотите узнать о NIO2, вы можете нажатьучиться
Комментарий исходного кода имеет кодировку GBK, если вы видите, что комментарий искажен, лучше изменить его на кодировку GBK.
Это демонстрация.Стоит отметить, что хотя пул потоков явно не создается в этом примере, это связано с тем, что если вы открываете () сервер, если вы не укажете его явно, система назначит пул потоков для ServerSocketChannel по умолчанию. , для обработки событий мы можем открыть JConsole для проверки.
channel = AsynchronousServerSocketChannel.open();
Как показано на рисунке, потоки от 0 до 4 — это пулы потоков, выделенные системой по умолчанию, которые используются для обработки событий ввода-вывода. (Находка лижет собаку)
Представьте, что если мы заблокируем все потоки при обработке событий ввода-вывода, то ввод-вывод всей системы будет заблокирован, как показано на следующем рисунке.
При поступлении новых событий ввода-вывода ядро выберет поток для обработки этих событий ввода-вывода.Если поток, обрабатывающий ввод-вывод, заблокирован, запрос от клиента всегда будет заблокирован и не может быть возвращен.Таким образом, поток, который обрабатывает события ввода-вывода, лучше всегоОбрабатывать только события ввода/вывода(получать новые соединения, копировать данные из ядра в поток)
Вы можете понять, что лизать собаку лучше всего лишь делать то, что должна делать лизать собака, то есть тяжелую работу.Что же касается основного дела или мероприятий, которые будут блокировать ситуацию, то лучше всего передать это сестре (Пул потоков обработки бизнес-логики) обрабатывать.
Модель Tomcat NIO2
ключевой класс
org.apache.tomcat.util.net.Nio2Endpoint
Поскольку мы объясняем модель обработки NIO2, нам необходимо понять следующие ключевые роли.
-
Nio2AcceptorАкцептор не привязан к конкретному потоку, но при поступлении нового соединения из пула потоков выбирается поток для выполнения кода Акцептора.Этот процесс делает за нас нижний слой.Основная задача Акцептора - получать новые подключения и регистрировать объекты обработки чтения и записи для подключения
-
LimitLatchОграничьте количество подключений, основной способ ограничить количество подключений в случае асинхронного ввода-вывода — заблокировать потоки в пуле потоков, используемом для событий ввода-вывода.
-
процессор ввода/выводаКласс, который обрабатывает ввод-вывод и работает в том же пуле потоков, что и Nio2Acceptor.
Запуск ServerSocket
Процесс асинхронного запуска ServerSocket относительно скучен.Если вы не хотите читать код, ниже приведен процесс его запуска.
- Создайте пул потоков, оберните его как
AsynchronousChannelGroup
- Открыть серверный сокет
- Привязать порт и установить максимальное количество подключений
@Override
public void bind() throws Exception {
// 创建线程池
if (getExecutor() == null) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
//创建用于I/O的线程池(需要用AsynchronousChannelGroup包装,才能提供给AsynchronousServerSocketChanne用)
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup needs exclusive access to its executor service
if (!internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
//创建ServerSocketChannel
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//绑定端口并设置backlog的参数
//backlog可以理解为当前最大待执行accept操作的连接数
serverSock.bind(addr, getAcceptCount());
// Initialize SSL if needed
initialiseSsl();
}
Как показано на рисунке ниже, это пул потоков, привязанный к текущему асинхронному каналу ServerSocketChannel, а 801 указывает порт, который прослушивает коннектор (в приложении есть руководство по включению NIO2).
Nio2Acceptor
Основная функция Nio2Acceptor接收新连接
,и限制最大连接数
, поскольку используется асинхронный ввод-вывод, Acceptor не привязан к конкретному потоку, а выбирает задачу выполнения из пула потоков, когда необходимо выполнить новую задачу. Как показано на рисунке ниже, при наличииКогда поступает новое соединение от клиента, программа выберет поток из пула потоков для выполнения завершенного метода Nio2Acceptor и передаст клиентский сокет, чтобы начать выполнение бизнес-логики обработки нового соединения.
Регистрация AcceptHandler
В асинхронном вводе-выводе нам нужно зарегистрировать обработчик, который обрабатывает событие Accept, с помощью ServerSocketChannel, чтобы завершить обработку события подключения. Как показано в следующем коде, при запуске tomcat запускается вызов потока.
Nio2SocketAcceptor
Метод запускаNio2SocketAcceptor
зарегистрирован какServerSocketChannel
Обработчик события принятия
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
implements CompletionHandler<AsynchronousSocketChannel, Void> {
...
@Override
public void run() {
// The initial accept will be called in a separate utility thread
if (!isPaused()) {
// 连接数限制,如果达到最大连接数,则调用此方法的线程会陷入等待
try {
countUpOrAwaitConnection();
} catch (InterruptedException e) {
// Ignore
}
if (!isPaused()) {
//将自己注册为accept事件的处理器(注意此类实现的接口)
serverSock.accept(null, this);
} else {
state = AcceptorState.PAUSED;
}
} else {
state = AcceptorState.PAUSED;
}
}
...
}
Обработка новых подключений
Когда приходит новое соединение, нижний уровень выбирает поток из пула потоков для выполнения завершенного метода и передает его в клиентский сокет.В это время основной процесс метода выглядит следующим образом
- Убедитесь, что контейнер все еще работает, если он все еще работает, продолжите процесс
- Проверьте, нужно ли ограничивать количество подключений. Если количество подключений необходимо ограничить, выберите поток из пула потоков для выполнения метода запуска акцептора (этот метод может блокироваться).
- После завершения вышеуказанных операций вызовите метод setSocketOptions, чтобы выполнить последующую обработку событий ввода-вывода, и прием нового соединения будет завершен.
@Override
public void completed(AsynchronousSocketChannel socket,
Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket
if (isRunning() && !isPaused()) {
//检查限制的最大连接数,如果没有设置(即-1)则不进行连接数限制
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else {
//由于有新连接的到达,因此需要从线程池选一个线程执行增加连接数的操作,此操作可能会发生阻塞
getExecutor().execute(this);
}
//执行后续的I/O事件处理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);
}
}
Реализация ограничения максимального количества подключений
Поскольку Acceptor не привязан к конкретному потоку, если вам нужно ограничить максимальное количество подключений, вам нужно использовать блокировки для блокировки простаивающих потоков, поэтому вам нужно отправлять в пул потоков при приеме новых подключений.增加新连接数
задача, как показано ниже (то есть вызов метода запуска Nio2SocketAcceptor)
public void completed(AsynchronousSocketChannel socket,
Void attachment) {
...
getExecutor().execute(this);
...
}
Кроме того, не забудьте создатьServerSocketChannel
когда мы устанавливаемbacklog
параметры?
Этот параметр в основном используется для установки максимального количества непринятых подключений, разрешенных текущим ServerSocket, то есть, если оно превышает значение, установленное отставанием количества непринятых подключений, все новые подключения будут отброшены. (Документация API)
Обработка событий ввода/вывода
Поскольку это асинхронный ввод-вывод, необходимо зарегистрировать чтение и запись на клиентском сокете.CompletionHandler
, следовательноsetSocketOptions
Это неизбежно приведет к этому шагу, так когда же этот шаг произойдет?
Найдено с помощью отслеживания отладкиsetSocketOptions
приведет кNio2SocketWrapper
, а фактический процесс ввода-вывода происходит при создании объекта Nio2SocketWrapper.readCompletionHandler
, ниже его код
ReadCompletionHandlerОн используется для мониторинга события чтения, после чтения данных будет вызван метод processSocket для запуска работы по разбору данных.
public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
super(channel, endpoint);
nioChannels = endpoint.getNioChannels();
socketBufferHandler = channel.getBufHandler();
this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer nBytes, ByteBuffer attachment) {
if (log.isDebugEnabled()) {
log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
}
readNotify = false;
//加锁,其他线程可能会对标志位进行修改
synchronized (readCompletionHandler) {
//nBytes表示读取到的字节数,如果小于0
//抛出EOF异常,没数据读,那咋办吗,只好抛异常了
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
if (readInterest && !Nio2Endpoint.isInline()) {
readNotify = true;
} else {
// Release here since there will be no
// notify/dispatch to do the release.
readPending.release();
}
readInterest = false;
}
}
if (readNotify) {
//处理读事件
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
}
}
//省略代码,后面太长了
...
Отладочная проверка, как показано на рисунке ниже, вложение — это данные, которые мы читаем
Обратите внимание, что IDEA может выдать запрос на переключение потоков во время отладки (чтение данных и предыдущие операции не в одном потоке, как показано ниже).
Суммировать
Модель Tomcat NIO2
Подытожено следующим образом-
Событие принятия и событие ввода-вывода совместно используют пул потоков и не привязаны к конкретному потоку.
-
Acceptor(Nio2Acceptor) используется для получения новых соединений и регистрации объектов обработки для событий ввода-вывода.
-
LimitLatch реализует функцию ограничения количества соединений путем блокировки потоков в пуле потоков
-
Обработчик ввода-вывода
Nio2SocketWrapper
Зарегистрированные процессоры чтения и записи, при поступлении события ввода-вывода программа выберет поток для выполнения кода этих процессоров -
Общий процесс выглядит следующим образомПриходит новое соединение -> выберите поток для выполнения
Nio2Acceptor
код -> поставить задачу увеличения количества подключений к пулу потоков -> зарегистрировать события обработки чтения и записи -> приходят события ввода-вывода, выбрать поток для обработки событий ввода-вывода
передача мысли
Не используйте пул потоков по умолчаниюПри создании асинхронного ServerSocketChannel tomcat сам создаст пул потоков вместо использования пула потоков, предоставляемого по умолчанию.Поскольку пул потоков находится под нашим контролем, реализована функция ограничения количества подключений.
Не блокируйте поток ввода/выводаПоток ввода-вывода должен иметь дочерний элемент потока ввода-вывода и не выполнять операции, которые будут блокироваться в течение длительного времени в потоке ввода-вывода.
Приложение Как отлаживать tomcat
Все бэкенд-программисты знают, что tomcat встроен в SpringBoot (конечно, причал, в зависимости от того, как вы выберете), поэтому мы можем создать новое приложение SpringBoot для отладки и изучения исходного кода Tomcat.
Ниже приведен процесс отладки tomcat
- первый шаг, открыть ИДЕЯ
- второй шаг, Создайте новый проект SpringBoot
- третий шаг, на боковой панели проекта нажмите Ctrl+F, чтобы найти пакет jar Tomcat.
- четвертый шаг, /выключить все, надеть наушники, нажать точку останова
Если вы хотите протестировать обработку NIO Tomcat, точки останова в следующих классах(Если вы хотите понять, как NIO обрабатывается в tomcat, вы можете проверить мойпонимать)
package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
...
public class Poller implements Runnable {
public void run() {
//此方法的代码位于692行
}
}
...
}
Если вы хотите протестировать TomcatNIO2метод обработки, требуется следующая конфигурацияДобавьте следующий код в свой код. (Поскольку режим ввода-вывода tomcat, встроенный в SpringBoot, по умолчанию — NIO, нам нужно добавить разъемы NIO2 через конфигурацию)
import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConnectorConf {
//注意你的SpringBoot版本,此项目的版本是2.2.0,旧的版本1.5使用不同的类进行配置
@Bean
public TomcatServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcatServletWebServerFactory =
new TomcatServletWebServerFactory();
tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
return tomcatServletWebServerFactory;
}
private Connector getConnector() {
// 关键点哦
Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
//将连接器的端口设置为801,这样访问801端口的就是NIO2的模式了
connector.setPort(801);
return connector;
}
}
идти сorg.apache.tomcat.util.net.Nio2Endpoint
Нажмите точку останова, и все готово
Как отлаживать многопоточность
В случае многопоточности может случиться так, что точка останова не может быть введена, в этом случае просто щелкните правой кнопкой мыши точку останова и выберитеThread
То есть, когда другие потоки достигают точки останова, метод IDEA выдает уведомление, как показано на следующем рисунке.