Применение NIO в Tomcat

Java

Понимание НИО

Лично я считаю, что самая большая разница между NIO и BIO заключается в активном и пассивном способе использования BIO нужно ждать, пока вызываемый объект вернет данные.Очевидно, что вызывающий объект в это время пассивен.

Например

блокировка ввода-выводаПредположим, вы робкий и застенчивый мальчик, у вас назначена встреча с девушкой-испытателем по соседству, но вы боитесь проявить инициативу на свидании, поэтому даете ей номер своего мобильного телефона и предлагаете, чтобы она позвонила вам, когда захочет дата . Очевидно вы в это время находитесь в пассивном состоянии.Результат встречи или нет требует от девушки инициативы сообщить вам.Если она забудет, то вы будете застигнуты долгим ожиданием и бесконечными догадками и неуверенностью в себе( очень плохо). [Если ты робкий, застенчивый и похотливый мальчик, это будет несчастным]

неблокирующий ввод-выводМы знаем, что у отморозков обычно много запасных шин, я называю этоЗапасное колесоБассейн, то когда он хочет пойти на свидание, он просто спрашивает девушку, не хочет ли она пойти на свидание.Если она хочет пойти на свидание, она пойдет с ней на свидание.После окончания свидания договоритесь с другими событиями знакомств. Если нет, продолжайте спрашивать в следующий раз. В этом примере встречу можно рассматривать как событие IO, а процесс приглашения девушки можно рассматривать как опрос пула запасных колес.

Если вы хотите изучить NIO, вы можетеучиться

Как Tomcat использует NIO

Поскольку это ввод-вывод сетевой связи, должны быть следующие два шага.

  • Запуск Северсокета
  • Обработка событий ввода/вывода

Код ключа находится в пакете org.apache.tomcat.util.net.NioEndpoint.

P.S. Статья слишком длинная, если не хотите читать, можете сразу прочитать заключение

Запуск ServerSocket

Глядя на код сначала, это шокирует, если вы посмотрите на модель Reactor.

Следующий код метода связывания — это процесс запуска ServerSocket.Основной процесс выглядит следующим образом

  • адрес привязки
  • Установите способ получения новых подключений какметод блокировки(ключевой момент)
  • Установите количество акцепторов и опрашивающих и инициализируйте SelectorPool.
    @Override
    public void bind() throws Exception {

        if (!getUseInheritedChannel()) {
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
            serverSock.socket().bind(addr,getAcceptCount());
        } else {
            // Retrieve the channel provided by the OS
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {
                serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {
                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        }
        // 以阻塞的方式来接收连接!!
        serverSock.configureBlocking(true); //mimic APR behavior

        // 设置Acceptor和Poller的数量
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            // 顾名思义,Acceptor是用来处理新连接的
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            // Poller 用来处理I/O事件
            pollerThreadCount = 1;
        }
        setStopLatch(new CountDownLatch(pollerThreadCount));

        // Initialize SSL if needed
        initialiseSsl();
        // 从此处可以看出tomcat池化了selector
        selectorPool.open();
    }

Как Tomcat NIO обрабатывает события ввода-вывода

В заключение, в модели Tomcat NIO есть следующие ключевые роли:

  • Acceptor используется для получения новых соединений, по одному потоку на Acceptor, получение новых соединений блокирующим образом.
  • Опросчик Когда акцептор получает новое соединение, он обрабатывает его и выбирает опросчик для обработки событий ввода-вывода в этом соединении.
  • LimitLatch Блокировка, используемая для ограничения количества подключений.

Acceptor

Основная задача Acceptor — непрерывно получать соединения от клиентов и передавать соединения Poller для обработки после простой обработки.

Получайте подключения от клиентов, если вы не хотите видеть код, то основной процесс следующий:

  • Принимать соединения от клиентов и передавать их Poller для обработки
      @Override
        public void run() {

            int errorDelay = 0;

            // running的检测贯穿了Accpetor的处理流程,在每次关键操作的时候都会执行检测
            while (running) {

                // 如果进入暂停状态则每隔一段时间检测一下
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                // 再次检测
                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //检查是否达到最大连接数如果是则陷入等待,如果不是则增加当前连接数
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        //接收新连接
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // 发生异常,则减少连接数
                        countDownConnection();
                        if (running) {
                         handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        //setSocketOptions会导致将该连接交给Poller处理
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }

Давайте посмотрим, что делает setSocketOptions.Если вы не хотите читать код, краткое изложение выглядит следующим образом.

  • Установите клиентский сокет в неблокирующий режим
  • Инкапсулировать клиентский сокет какNioChannelилиSecureNioChannel(с использованием технологии объединения объектов)
  • отПул опросниковПолучите опросник и зарегистрируйте NioChannel с помощью опросчика
  protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //设置为非阻塞模式,以便通过selector进行查询
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            //从对象池中获取一个NioChannel,tomcat会复用一切可以复用的对象以减少创建新对象所带来的消耗
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
               // 没有获取到,那就新建一个呗
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                // SSL这一块还没研究
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                //重新设置SocketBufferHandler,将其设置为可写和可读
                channel.reset();
            }
            //从Poller池中获取一个Poller(按照次序获取,可以理解为一个圆环),并将Channel注册到上面
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

Poller

Начиная с регистрации подключения к Поллеру

Получить опрос без блокировки

Подробности смотрите в коде

Ключевой момент: получение остатка от числа A ограничит результат остатка диапазоном A

    /**
     * Return an available poller in true round robin fashion.
     * 很明显,取余的方式揭示了获取Poller的方法。你可以理解为
     * Poller会组成一个圆环,这样我们就可以通过不断递增获取
     * 下一个Poller,但是数据会溢出所以我们要取绝对值
     * @return The next poller in sequence
     */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }

Регистрация канала

Этот метод инкапсулирует только что установленное соединение и используетPollerEventзарегистрировался в соответствующем Поллере в виде

Следует отметить, что реально зарегистрированное событие чтения не регистрируется в этом методе (текущий вызывающий метод — поток Acceptor), а событие чтения регистрируется в потоке Poller.

        /**
         * Registers a newly created socket with the poller.
         * 将新建的socket注册到Poller上
         * @param socket    The newly created socket
         */
        public void register(final NioChannel socket) {
            //以下代码为设置各种参数,可以从方法名进行推测,不再进行叙述
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            //从缓存中获取一个PollerEvent
            PollerEvent r = eventCache.pop();
            // 注册读事件
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            // 如果没有从缓存中获取,那么就新建一个
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

Поллер обрабатывает события ввода/вывода

Код Poller для обработки событий ввода-вывода длиннее и содержит больше деталей.

  • Определите, отправляет ли Acceptor PollerEvent, и если да, вызовите метод запуска pollerEvent, чтобы зарегистрировать событие чтения.
  • Определить, закрыт ли опросчик при выполнении ключевых операций, если да, выполнить соответствующие операции по освобождению ресурсов и завершению работы.
  • Вызовите selector.select() для опроса событий и, если есть события чтения, передайте ихprocessKeyиметь дело с
        @Override
        public void run() {
            // Loop until destroy() is called
            // 一直循环直到destroy方法被调用
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        // events 方法会处理Acceptor注册到Poller中的PollerEvent
                        // 主要是注册读事件
                        hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            //if we are here, means we have other stuff to do
                            //do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    // 检测到关闭,则处理剩余的事件并关闭selector
                    if (close) {
                        // 处理Acceptors注册到Poller中的PollerEvent
                        events();
                        //selector time out 或者poller被关闭就会调用timeout方法
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error("",x);
                    continue;
                }
                //either we timed out or we woke up, process events first
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());
                // 执行 select 操作,查询I/O事件
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        // 处理检测到的I/O事件
                        processKey(sk, attachment);
                    }
                }//while

                //timeout 会检查是否关闭,如果已经关闭并且有事件未处理会调用cancelledKey方法
                //cancelledKey:该方法主要是对和该连接相关的资源执行关闭操作
                timeout(keyCount,hasEvents);
            }//while

            getStopLatch().countDown();
        }

processKey обрабатывает события ввода/вывода

Основная работа processKey заключается в следующем

  • Проверьте еще раз, закрыт ли опросчик, если да, освободите ресурс.
  • Определить, является ли запрошенное событие законным, если оно является допустимым, отменить событие, зарегистрированное в селекторе, и событие, запрошенное этим опросом.
  • Затем вызовите processSocket для обработки события чтения, а затем обработайте событие записи.
        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            try {
                if ( close ) {
                    // 如果Poller关闭则关闭和释放和此连接相关的资源
                    cancelledKey(sk);
                } else if ( sk.isValid() && attachment != null ) {
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            // 取消注册事件
                            // sk.interestOps()& (~readyOps)
                            unreg(sk, attachment, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write 先读后写
                            if (sk.isReadable()) {
                               // 关键代码,调用processSocket方法处理读事件
                                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
        }

processSocket true - обрабатывать события ввода/вывода

processSocket определен в org.apache.tomcat.util.net.AbstractEndPoint, что означает, что этот метод вызывается независимо от того, используете ли вы BIO, NIO или NIO2 для окончательного чтения и записи данных.

Как видно из кода, это по-прежнему пул объектов, и он по-прежнему снова инкапсулируется (матрешка) и отправляется в пул потоков на выполнение.Следующее содержимое не входит в рамки этого обсуждения.

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

Суммировать

Модель Tomcat NIO

NIO模型

Рука трясется, линия не очень♂

LimitLatchОбщий для всех акцепторов, чтобы ограничить текущее максимальное количество подключений

AcceptorПолучайте новые соединения в блокирующей форме, инкапсулируйте их как объекты PollerEvent и отправляйте их в Poller.

PollerПолучить PollerEvent от Акцептора и зарегистрировать событие чтения, а также опросить привязанный к нему клиентский сокет на наличие событий чтения, если есть, выполнить дальнейшие операции и отправить его в другие места для обработки (разобрать протокол Http)

передача мысли

Изучение исходного кода означает изучение его дизайнерских идей», — Воз и Шуоде.

объединение объектовОбъекты в пуле и соединения в пуле могут значительно сократить потребление вновь созданных объектов и GC, когда вам нужно использовать их из пула для сброса соответствующих значений

круговая очередьХоть эта штука и не нова, но с атомарным классом можно эффективно получить следующий элемент в очереди в случае высокого параллелизма (ранее не рассматривалась обработка переполнения индекса в циклической очереди)

Блокировка на получение ссылок, неблокирующая обработка IO-событийЭто сильно контрастирует с моделью Reactor.При изучении NIO мышление ограничено, и считается, что неблокирующее получение соединения позволит достичь более высокой производительности, но сейчас ситуация не уверена (еще не проверял, что брат пробовал и мне немного рассказал)

Во время критических операций проверьте бит флагаЕсли вы хотите управлять своим потоком через переменную флага, а цикл потока занимает относительно много времени (у вас слишком длинный код, слишком много операций), то лучше проверить свою переменную флага перед выполнением критических операций, чтобы решить, если вы хотите изменить поведение потока (код опроса Kang Kang и Acceptor)

При первом изучении кода Tomcat просьба указывать на ошибки в понимании