Как запрос в анализе исходного кода Netty достигает каналаЧтение?

Netty

Следующий анализ говорит только о NIO

Общий процесс использования java nio для сетевого программирования выглядит следующим образом.

Есть ли возможности для оптимизации в этом процессе?

Введение в использование java nio
java nio запустить анализ исходного кода

Netty — это оболочка для сетевой инфраструктуры Java, и она, безусловно, будет иметь аналогичный процесс обработки. Должно быть, сделали свою собственную оптимизацию в этом отношении

Нетти Приступая к работе
Анализ исходного кода Netty Hello world

Получить селектор

При использовании Netty используется соответствующая EventLoopGroup, которая фактически завершает процесс инициализации Selector

Нетти настроила коллекцию SelectionKeys, создала упаковку слоев и фактически заменила коллекцию Selector только одним SelectorKey с двумя коллекциями по умолчанию.

получить канал

При использовании Netty будет выполняться тип канала, а затем при выполнении метода привязки канал будет инициализирован здесь

Способ построения такойclass.newInstance(), Возьмите в качестве примера NioServerSocketChannel, он выполняет соответствующий конструктор без аргументов.

 public NioServerSocketChannel() {
 		//newSocket即返回java的ServerSocketChannel
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
 }
 public NioServerSocketChannel(ServerSocketChannel channel) {
 		//指定当前channel用来接收连接请求,并在父类中指定为非阻塞
        super(null, channel, SelectionKey.OP_ACCEPT);
        //javaChannel()即这里的参数channel
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

Сразу после того, как Netty начала инициализацию канала, в конце NioServerSocketChannel был добавлен конвейер.ChannelInboundHandlerAdapterкоторыйServerBootstrapAcceptor, это будет иметьchildGroupа такжеchildHandler, childHandler — определяемый пользователем обработчик каналов, а childGroup — EventLoop, используемый для обработки запросов.В настоящее время структура всего конвейера

childGroup — это имя поля в исходном коде, соответствующее пулу рабочих потоков, переданному в группу.

Регистрация канала связана с адресом порта прослушивания.

Регистрация предназначена для установления связи между каналом и селектором.Стоит отметить, что пул потоков, используемый для регистрации,group, В соответствии с пулом потоков, переданным пользователем, то есть пулом потоков босса, регистрация, сопоставление портов и адресов следуют процессу запуска Netty.

На этом этапе вы можете видеть, что все приготовления, требуемые java nio, готовы, а остальное — дождаться возникновения события и обработать возникшее событие. Отличие от обычного java nio в том, что

  • Netty подготовила два пула потоков, только один и тот же пул используется для регистрации каналов и мониторинга привязки портов.

ждать, пока произойдет событие

NioEventLoop реализует Executor, что означает, что он принимает задачи, представленные другими местами для его выполнения.Общая структура выполнения выглядит следующим образом.

//判断当前正在执行的线程是否是Netty自己的eventLoop中保存的线程
boolean inEventLoop = inEventLoop();
  if (inEventLoop) {
    //往队列里添加任务
  	addTask(task);
  } else {
  	//这里即运行NioEventLoop自身的run方法
	startThread();
  	addTask(task);
  }

NioEventLoop запускает поток для выполнения метода run.Общая структура выглядит следующим образом.

for (;;) {
 if (hasTasks()) {
    selectNow();
   } else {
    select(oldWakenUp);
   }
  processSelectedKeys();
  runAllTasks();
}

Процесс обработки цикла выполнения выглядит следующим образом

Примечательно, что это单个线程在运行,而且非本线程的任务一概不处理

Время начала потока босса

В процессе запуска существует ServerBootstrap для подстраивания всего процесса, его поток выполнения является основным потоком, а все зарегистрированные события выполняются самим пулом потоков.В это время поток выполнения не должен быть собственным потоком EventLoop. , так начинается поток в боссе и завершается регистрация в задаче очереди

Новое запрашивание подключения

Когда NioServerSocketChannel привязан к порту, NioEventLoop, соответствующий NioServerSocketChannel, будет ожидать события на канале. Весь процесс обработки выглядит следующим образом

  1. Прочитайте содержимое сообщения, которое происходит в NioServerSocketChannel.Для этого нового события подключения оно упаковывается в канал запроса клиента для последующей обработки.

    protected int doReadMessages(List<Object> buf) throws Exception {
     		//1:获取请求的channel
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                	//2:包装成一个请求,Socket channel返回
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
  2. Возвращенный NioSocketChannel завершает инициализацию собственного канала и регистрирует интересные события.

     protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
    }
    

Напомним, что следующая ссылка в боссеServerBootstrapAcceptor, а его обработка чтения сообщения заключается в добавлении собственного обработчика пользователя и продолжении завершения события регистрации

 public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

Когда рабочий поток запускается

Регистрация воркера происходит в потоке выполнения босса, и в этот момент это не должен быть тот же самый поток, поэтому запустите поток воркера и завершите событие регистрации внутренне, ожидая прихода прочитанного сообщения

Обработка сообщений OP_read

После установления соединения запрос передаетсяNioSocketChannelДля обработки он инкапсулирует прочитанное сообщение в ByteBuf черезInBoundпроцессорfireChannelReadОн по очереди передается в другие места для потребления, пока сообщение tailContext не будет обработано.

Здесь также может быть известно, что вход конвейера означает, что данные передаются в netty, а обратная запись выполняется на всем пути к голове, а затем записывается в канал.

Поток обработки Nio в Netty

Из приведенного выше анализа можно сделать вывод, что поток обработки Netty выглядит следующим образом.

Нужно ли боссу несколько потоков

Конфигурация многопоточности mainReactor, это полезно для прослушивания нескольких портов, конечно, 1 также может обрабатывать несколько портов

Реакторный режим

Скорость обработки ЦП выше, чем скорость обработки ввода-вывода. При обработке вещей наилучшая ситуация заключается в том, что ЦП не будет блокироваться из-за обработки ввода-вывода, что приводит к «отходам» ЦП. Конечно, многопоточность может использоваться для обработки запросов ввода-вывода, но это увеличит переключение контекста потока, и операция ввода-вывода может не завершиться в прошлом, что также расточительно.

Другой способ — уведомить ЦП о завершении операции ввода-вывода. Тогда кто узнает, что операция ввода-вывода завершена? И передать событие процессору для обработки? В режиме Reactor это роль Reactor: он запускает поток, который постоянно выполняется в ожидании ввода-вывода, и распределяет его между различными предварительно зарегистрированными обработчиками событий для обработки в соответствии с типом события.

Шаблон Reactor абстрагируется следующим образом.

Абстрактное изображение предоставлено автором
ссылка на реактор