Следующий анализ говорит только о NIO
Общий процесс использования java nio для сетевого программирования выглядит следующим образом.
Есть ли возможности для оптимизации в этом процессе?
Введение в использование java nio
java nio запустить анализ исходного кода
Netty — это оболочка для сетевой инфраструктуры Java, и она, безусловно, будет иметь аналогичный процесс обработки. Должно быть, сделали свою собственную оптимизацию в этом отношении
Нетти Приступая к работе
Анализ исходного кода Netty Hello world
Получить селектор
При использовании Netty используется соответствующая EventLoopGroup, которая фактически завершает процесс инициализации Selector
Нетти настроила коллекцию SelectionKeys, создала упаковку слоев и фактически заменила коллекцию Selector только одним SelectorKey с двумя коллекциями по умолчанию.
получить канал
При использовании Netty будет выполняться тип канала, а затем при выполнении метода привязки канал будет инициализирован здесь
Способ построения такой
class.newInstance()
, Возьмите в качестве примера NioServerSocketChannel, он выполняет соответствующий конструктор без аргументов.
public NioServerSocketChannel() {
//newSocket即返回java的ServerSocketChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
//指定当前channel用来接收连接请求,并在父类中指定为非阻塞
super(null, channel, SelectionKey.OP_ACCEPT);
//javaChannel()即这里的参数channel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Сразу после того, как Netty начала инициализацию канала, в конце NioServerSocketChannel был добавлен конвейер.ChannelInboundHandlerAdapter
которыйServerBootstrapAcceptor
, это будет иметьchildGroup
а такжеchildHandler
, childHandler — определяемый пользователем обработчик каналов, а childGroup — EventLoop, используемый для обработки запросов.В настоящее время структура всего конвейера
childGroup — это имя поля в исходном коде, соответствующее пулу рабочих потоков, переданному в группу.
Регистрация канала связана с адресом порта прослушивания.
Регистрация предназначена для установления связи между каналом и селектором.Стоит отметить, что пул потоков, используемый для регистрации,group
, В соответствии с пулом потоков, переданным пользователем, то есть пулом потоков босса, регистрация, сопоставление портов и адресов следуют процессу запуска Netty.
На этом этапе вы можете видеть, что все приготовления, требуемые java nio, готовы, а остальное — дождаться возникновения события и обработать возникшее событие. Отличие от обычного java nio в том, что
- Netty подготовила два пула потоков, только один и тот же пул используется для регистрации каналов и мониторинга привязки портов.
ждать, пока произойдет событие
NioEventLoop реализует Executor, что означает, что он принимает задачи, представленные другими местами для его выполнения.Общая структура выполнения выглядит следующим образом.
//判断当前正在执行的线程是否是Netty自己的eventLoop中保存的线程
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
//往队列里添加任务
addTask(task);
} else {
//这里即运行NioEventLoop自身的run方法
startThread();
addTask(task);
}
NioEventLoop запускает поток для выполнения метода run.Общая структура выглядит следующим образом.
for (;;) {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
}
processSelectedKeys();
runAllTasks();
}
Процесс обработки цикла выполнения выглядит следующим образом
Примечательно, что это单个线程在运行,而且非本线程的任务一概不处理
Время начала потока босса
В процессе запуска существует ServerBootstrap для подстраивания всего процесса, его поток выполнения является основным потоком, а все зарегистрированные события выполняются самим пулом потоков.В это время поток выполнения не должен быть собственным потоком EventLoop. , так начинается поток в боссе и завершается регистрация в задаче очереди
Новое запрашивание подключения
Когда NioServerSocketChannel привязан к порту, NioEventLoop, соответствующий NioServerSocketChannel, будет ожидать события на канале. Весь процесс обработки выглядит следующим образом
-
Прочитайте содержимое сообщения, которое происходит в NioServerSocketChannel.Для этого нового события подключения оно упаковывается в канал запроса клиента для последующей обработки.
protected int doReadMessages(List<Object> buf) throws Exception { //1:获取请求的channel SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //2:包装成一个请求,Socket channel返回 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
-
Возвращенный NioSocketChannel завершает инициализацию собственного канала и регистрирует интересные события.
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
Напомним, что следующая ссылка в боссеServerBootstrapAcceptor
, а его обработка чтения сообщения заключается в добавлении собственного обработчика пользователя и продолжении завершения события регистрации
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
Когда рабочий поток запускается
Регистрация воркера происходит в потоке выполнения босса, и в этот момент это не должен быть тот же самый поток, поэтому запустите поток воркера и завершите событие регистрации внутренне, ожидая прихода прочитанного сообщения
Обработка сообщений OP_read
После установления соединения запрос передаетсяNioSocketChannel
Для обработки он инкапсулирует прочитанное сообщение в ByteBuf черезInBound
процессорfireChannelRead
Он по очереди передается в другие места для потребления, пока сообщение tailContext не будет обработано.
Здесь также может быть известно, что вход конвейера означает, что данные передаются в netty, а обратная запись выполняется на всем пути к голове, а затем записывается в канал.
Поток обработки Nio в Netty
Из приведенного выше анализа можно сделать вывод, что поток обработки Netty выглядит следующим образом.
Нужно ли боссу несколько потоков
Конфигурация многопоточности mainReactor, это полезно для прослушивания нескольких портов, конечно, 1 также может обрабатывать несколько портов
Реакторный режим
Скорость обработки ЦП выше, чем скорость обработки ввода-вывода. При обработке вещей наилучшая ситуация заключается в том, что ЦП не будет блокироваться из-за обработки ввода-вывода, что приводит к «отходам» ЦП. Конечно, многопоточность может использоваться для обработки запросов ввода-вывода, но это увеличит переключение контекста потока, и операция ввода-вывода может не завершиться в прошлом, что также расточительно.
Другой способ — уведомить ЦП о завершении операции ввода-вывода. Тогда кто узнает, что операция ввода-вывода завершена? И передать событие процессору для обработки? В режиме Reactor это роль Reactor: он запускает поток, который постоянно выполняется в ожидании ввода-вывода, и распределяет его между различными предварительно зарегистрированными обработчиками событий для обработки в соответствии с типом события.
Шаблон Reactor абстрагируется следующим образом.
Абстрактное изображение предоставлено автором
ссылка на реактор