Анализ обработки NIO в Tomcat 7

Java задняя часть исходный код Apache Windows Tomcat

Коннектор Tomcat имеет три режима работы: bio, nio и apr. Давайте сначала поймем разницу между этими тремя.

  1. bio(blocking I/O), как следует из названия, является блокирующей операцией ввода/вывода, что означает, что Tomcat использует традиционные операции ввода/вывода Java (т.е.java.ioпакет и его подпакеты). Tomcat по умолчанию работает в биорежиме. Вообще говоря, биорежим имеет самую низкую производительность из трех режимов работы.
  2. nio (новый ввод-вывод) — это новый метод операций ввода-вывода, предоставляемый Java SE 1.4 и последующими версиями (т. е.java.nioпакет и его подпакеты). Java nio — это Java API на основе буфера, который обеспечивает неблокирующие операции ввода-вывода, поэтому nio также рассматривается как сокращение от неблокирующего ввода-вывода. Он имеет более высокую производительность одновременного выполнения, чем традиционные операции ввода-вывода (био). Чтобы запустить Tomcat в режиме nio, просто нужно установить каталог в Tomcat./conf/server.xmlПротокол узла Connector настраивается в файле какorg.apache.coyote.http11.Http11NioProtocolВот и все.
  3. apr (Apache Portable Runtime/Apache Portable Runtime) — это библиотека поддержки для HTTP-сервера Apache. Можно просто понять, что Tomcat будет вызывать основную библиотеку динамической компоновки HTTP-сервера Apache в форме JNI для обработки операций чтения файлов или передачи по сети, тем самым значительно повышая производительность Tomcat при обработке статических файлов. Tomcat apr также является предпочтительным режимом для запуска высокопараллельных приложений на Tomcat.

Написать сервер сокетов BIO относительно легко.Это не что иное, как бросание каждого сокета в поток для обработки запроса и генерации ответа.Точка, которую можно улучшить таким образом, заключается в увеличении поддержки пула потоков. В этой статье в основном анализируется NIO в Tomcat, соответствующая логика кода метода обработки.

Код ключа находится вorg.apache.tomcat.util.net.NioEndpointВ этом классе это основной компонент в Http11NioProtocol, отвечающий за получение и обработку сокетов.Несмотря на то, что код очень длинный, при внимательном чтении вы найдете много общих моментов, таких как:

  1. Будет расширять или упаковывать исходный API в JDK, например, ThreadPoolExecutor является правильнымjava.util.concurrent.ThreadPoolExecutorРасширение NioChannel является расширением ByteChannel, а KeyAttachment является оболочкой для NioChannel.
  2. Многие классы спроектированы без использования GC, что удобно для кэширования и повторного использования.Метод реализации заключается в построении очереди через класс ConcurrentLinkedQueue. Например, ConcurrentLinkedQueue CPUCache, ConcurrentLinkedQueue keyCache, ConcurrentLinkedQueue eventCache, ConcurrentLinkedQueue nioChannels в классе NioEndpoint. События ConcurrentLinkedQueue в классе Poller

Давайте посмотрим на всю схему структуры компонента Connector:

Любой, кто читал предыдущую статью о запуске Tomcat, должен знать, что запуск соединителя вызывает метод startInternal класса соединителя, который вызывает метод start() класса protocolHandler, который вызывает метод start() абстрактной конечной точки, который вызовет метод start() абстрактной конечной точки, startInternal() конкретного класса Endpoint, поэтому анализ кода начинается с startInternal класса NioEndpoint.

1. Инициализация основных компонентов класса NioEndpoint

/** 
 * Start the NIO endpoint, creating acceptor, poller threads. 
 */  
@Override  
public void startInternal() throws Exception {  
  
    if (!running) {  
        running = true;  
        paused = false;  
  
        // Create worker collection  
        if ( getExecutor() == null ) {  
            // 构造线程池,用于后续执行SocketProcessor线程,这就是上图中的Worker。  
            createExecutor();  
        }  
  
        initializeConnectionLatch();  
  
        // Start poller threads  
        // 根据处理器数量构造一定数目的轮询器,即上图中的Poller  
        pollers = new Poller[getPollerThreadCount()];  
        for (int i=0; i<pollers.length; i++) {  
            pollers[i] = new Poller();  
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);  
            pollerThread.setPriority(threadPriority);  
            pollerThread.setDaemon(true);  
            pollerThread.start();  
        }  
  
        // 创建接收者线程,即上图中的Acceptor  
        startAcceptorThreads();  
    }  
}  

startAcceptorThreads вызывает родительский классorg.apache.tomcat.util.net.AbstractEndpointРеализация в:

protected final void startAcceptorThreads() {  
    int count = getAcceptorThreadCount();  
    acceptors = new Acceptor[count];  
  
    for (int i = 0; i < count; i++) {  
        // 调用子类的createAcceptor方法,本例中即NioEndpoint类的createAcceptor方法  
        acceptors[i] = createAcceptor();  
        String threadName = getName() + "-Acceptor-" + i;  
        acceptors[i].setThreadName(threadName);  
        Thread t = new Thread(acceptors[i], threadName);  
        t.setPriority(getAcceptorThreadPriority());  
        t.setDaemon(getDaemon());  
        t.start();  
    }  
}

Выше приведен процесс инициализации основных компонентов, таких как Acceptor, Poller и Worker.

2. Запрос на получение

После инициализации основных компонентов поток Acceptor получает подключение к сокету.Посмотрите на исходный код Acceptor:


// --------------------------------------------------- Acceptor Inner Class  
/** 
 * 后台线程,用于监听TCP/IP连接以及将它们分发给相应的调度器处理。 
 * The background thread that listens for incoming TCP/IP connections and 
 * hands them off to an appropriate processor. 
 */  
protected class Acceptor extends AbstractEndpoint.Acceptor {  
  
    @Override  
    public void run() {  
  
        int errorDelay = 0;  
  
        // 循环遍历直到接收到关闭命令  
        // Loop until we receive a shutdown command  
        while (running) {  
  
            // Loop if endpoint is paused  
            while (paused && running) {  
                state = AcceptorState.PAUSED;  
                try {  
                    Thread.sleep(50);  
                } catch (InterruptedException e) {  
                    // Ignore  
                }  
            }  
  
            if (!running) {  
                break;  
            }  
            state = AcceptorState.RUNNING;  
  
            try {  
                // 如果已经达到最大连接数则让线程等待  
                //if we have reached max connections, wait  
                countUpOrAwaitConnection();  
  
                SocketChannel socket = null;  
                try {  
                    // 接收连接,这里用的阻塞模式。  
                    // Accept the next incoming connection from the server  
                    // socket  
                    socket = serverSock.accept();  
                } catch (IOException ioe) {  
                    //we didn't get a socket  
                    countDownConnection();  
                    // Introduce delay if necessary  
                    errorDelay = handleExceptionWithDelay(errorDelay);  
                    // re-throw  
                    throw ioe;  
                }  
                // Successful accept, reset the error delay  
                errorDelay = 0;  
  
                // 注意这个setSocketOptions方法  
                // 它将把上面接收到的socket添加到轮询器Poller中  
                // setSocketOptions() will add channel to the poller  
                // if successful  
                if (running && !paused) {  
                    if (!setSocketOptions(socket)) {  
                        countDownConnection();  
                        closeSocket(socket);  
                    }  
                } else {  
                    countDownConnection();  
                    closeSocket(socket);  
                }  
            } catch (SocketTimeoutException sx) {  
                // Ignore: Normal condition  
            } catch (IOException x) {  
                if (running) {  
                    log.error(sm.getString("endpoint.accept.fail"), x);  
                }  
            } catch (OutOfMemoryError oom) {  
                try {  
                    oomParachuteData = null;  
                    releaseCaches();  
                    log.error("", oom);  
                }catch ( Throwable oomt ) {  
                    try {  
                        try {  
                            System.err.println(oomParachuteMsg);  
                            oomt.printStackTrace();  
                        }catch (Throwable letsHopeWeDontGetHere){  
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);  
                        }  
                    }catch (Throwable letsHopeWeDontGetHere){  
                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);  
                    }  
                }  
            } catch (Throwable t) {  
                ExceptionUtils.handleThrowable(t);  
                log.error(sm.getString("endpoint.accept.fail"), t);  
            }  
        }  
        state = AcceptorState.ENDED;  
    }  
}

3. Настройки параметров сокета

Получив соединение в Acceptor, вызовите метод setSocketOptions, чтобы установить некоторые параметры SocketChannel, а затем зарегистрируйте SocketChannel в Poller. Взгляните на реализацию setSocketOptions:

/** 
 * Process the specified connection. 
 */  
protected boolean setSocketOptions(SocketChannel socket) {  
    // Process the connection  
    try {  
        // 将SocketChannel配置为非阻塞模式  
        //disable blocking, APR style, we are gonna be polling it  
        socket.configureBlocking(false);  
        Socket sock = socket.socket();  
        // 设置Socket参数值(从server.xml的Connector节点上获取参数值)  
        // 比如Socket发送、接收的缓存大小、心跳检测等  
        socketProperties.setProperties(sock);  
  
        // 从NioChannel的缓存队列中取出一个NioChannel  
        // NioChannel是SocketChannel的一个的包装类  
        // 这里对上层屏蔽SSL和一般TCP连接的差异  
        NioChannel channel = nioChannels.poll();  
  
        // 缓存队列中没有则新建一个NioChannel  
        if ( channel == null ) {  
            // SSL setup  
            if (sslContext != null) {  
                SSLEngine engine = createSSLEngine();  
                int appbufsize = engine.getSession().getApplicationBufferSize();  
                NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),  
                                                                   Math.max(appbufsize,socketProperties.getAppWriteBufSize()),  
                                                                   socketProperties.getDirectBuffer());  
                channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);  
            } else {  
                // normal tcp setup  
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),  
                                                                   socketProperties.getAppWriteBufSize(),  
                                                                   socketProperties.getDirectBuffer());  
  
                channel = new NioChannel(socket, bufhandler);  
            }  
        } else {  
            // 将SocketChannel关联到从缓存队列中获取的NioChannel上来  
            channel.setIOChannel(socket);  
            if ( channel instanceof SecureNioChannel ) {  
                SSLEngine engine = createSSLEngine();  
                ((SecureNioChannel)channel).reset(engine);  
            } else {  
                channel.reset();  
            }  
        }  
        // 将新接收到的SocketChannel注册到Poller中  
        getPoller0().register(channel);  
    } catch (Throwable t) {  
        ExceptionUtils.handleThrowable(t);  
        try {  
            log.error("",t);  
        } catch (Throwable tt) {  
            ExceptionUtils.handleThrowable(t);  
        }  
        // Tell to close the socket  
        return false;  
    }  
    return true;  
}

основной вызов последнийgetPoller0().register(channel);Он заключает настроенный SocketChannel в PollerEvent и добавляет его в очередь буфера событий Poller.

4. Прочитайте регистрацию события

Метод getPoller0 будет опрашивать текущий массив Poller и возвращать из него Poller. (Для инициализации Poller см. Шаг 1 выше: Инициализация основных компонентов класса NioEndpoint)

/** 
 * Return an available poller in true round robin fashion 
 */  
public Poller getPoller0() {  
    // 最简单的轮询调度算法,poller的计数器不断加1再对poller数组取余数  
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;  
    return pollers[idx];  
}  

Затем вызовите метод register объекта Poller:

        public void register(final NioChannel socket) {
            // 设置socket的Poller引用,便于后续处理
            socket.setPoller(this);
            // 从NioEndpoint的keyCache缓存队列中取出一个KeyAttachment
            KeyAttachment key = keyCache.poll();
            // KeyAttachment实际是NioChannel的包装类
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            // 重置KeyAttachment对象中Poller、NioChannel等成员变量的引用
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());

            // 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象
            PollerEvent r = eventCache.poll();
            // 设置读操作为感兴趣的操作
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);

            // 加入到Poller对象里的事件队列
            addEvent(r);
        }

Взгляните на код addEvent в классе Poller:

        /**
         * Only used in this class. Will be made private in Tomcat 8.0.x
         * @deprecated
         */
        @Deprecated
        public void addEvent(Runnable event) {
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }

Всего две строки, первая строка добавляется из объекта события в очередь кеша, а вторая строка пробуждает заблокированный селектор, если в текущей очереди событий нет события.

5.Поток обработки опроса

Вышеупомянутое состоит в том, что сокет, полученный от акцептора, обертывается в форме PollerEvent и добавляется в очередь кэша событий Poller.Далее давайте посмотрим на процесс обработки другого основного компонента, Poller:


    /**
     * Poller class.
     */
    public class Poller implements Runnable {

        // 这就是NIO中用到的选择器,可以看出每一个Poller都会关联一个Selector
        protected Selector selector;
        // 待处理的事件队列
        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();

        // 唤醒多路复用器的条件阈值
        protected AtomicLong wakeupCounter = new AtomicLong(0l);

        public Poller() throws IOException {
            // 对Selector的同步访问,通过调用Selector.open()方法创建一个Selector
            synchronized (Selector.class) {
                // Selector.open() isn't thread safe
                // http://bugs.sun.com/view_bug.do?bug_id=6427854
                // Affects 1.6.0_29, fixed in 1.7.0_01
                this.selector = Selector.open();
            }
        }

        // 通过addEvent方法将事件添加到Poller的事件队列中
        /**
         * Only used in this class. Will be made private in Tomcat 8.0.x
         * @deprecated
         */
        @Deprecated
        public void addEvent(Runnable event) {
            events.offer(event);
            // 如果队列中没有待处理的事件则唤醒处于阻塞状态的selector
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }

        // 处理事件队列中的所有事件,如果事件队列是空的则返回false
        /**
         * Processes events in the event queue of the Poller.
         *
         * @return true if some events were processed,
         *   falseскопировать код

6.Поток обработки событий опроса

Суть обработки Poller состоит в том, чтобы запустить и выполнить PollerEvent в очереди событий, а затем пройти ключ готовности из селектора.Как только происходит интересующее событие, оно передается методу processSocket для обработки. Роль PollerEvent заключается в регистрации или обновлении событий, представляющих интерес для сокета:

    /**
     *
     * PollerEvent, cacheable object for poller events to avoid GC
     */
    public static class PollerEvent implements Runnable {

	// 每个PollerEvent都会保存NioChannel的引用
        protected NioChannel socket;
        protected int interestOps;
        protected KeyAttachment key;
        public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
            reset(ch, k, intOps);
        }

        public void reset(NioChannel ch, KeyAttachment k, int intOps) {
            socket = ch;
            interestOps = intOps;
            key = k;
        }

        public void reset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {
            //socket第一次注册到selector中,完成对socket读事件的注册
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                // socket之前已经注册到了selector中,更新socket所感兴趣的事件
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    boolean cancel = false;
                    if (key != null) {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            //handle callback flag
                            if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false);
                            }
                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                            // 刷新事件的最后访问时间,防止事件超时 
                            att.access();//to prevent timeout
                            //we are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            cancel = true;
                        }
                    } else {
                        cancel = true;
                    }
                    if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
                    }catch (Exception ignore) {}
                }
            }//end if
        }//run

        @Override
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]";
        }
    }

7. Передать сокет в Worker для выполнения

При анализе потока обработки Poller на шаге 5 видно, что его метод run, наконец, вызовет processKey() для обработки события канала, обнаруженного селектором, и в конце этого метода будет вызван processSocket для вызова конкретная логика обработки канала, см. ProcessSocket Реализация метода:

    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg
            // 从SocketProcessor的缓存队列中取出一个来处理socket
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);
            // 将有事件发生的socket交给Worker处理 
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

Посредством координации NioEndpoint Poller передает сокет события рабочему потоку Worker для дальнейшей обработки. На этом заканчивается работа всего фрейма события, а далее идет обработка Worker.

8. Обработать запрос из сокета

В реализации обработки NIO версии Tomcat 6 есть класс Worker, который был удален в Tomcat 7, но ответственность за worker осталась, но передана классу SocketProcessor, показанному выше. класс Код реализации:

    // ---------------------------------------------- SocketProcessor Inner Class
    // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。
    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor implements Runnable {

        // 每个SocketProcessor保存一个NioChannel的引用
        protected NioChannel socket = null;
        protected SocketStatus status = null;

        public SocketProcessor(NioChannel socket, SocketStatus status) {
            reset(socket,status);
        }

        public void reset(NioChannel socket, SocketStatus status) {
            this.socket = socket;
            this.status = status;
        }

        @Override
        public void run() {
            // 从socket中获取SelectionKey
            SelectionKey key = socket.getIOChannel().keyFor(
                    socket.getPoller().getSelector());
            KeyAttachment ka = null;

            if (key != null) {
                ka = (KeyAttachment)key.attachment();
            }

            // Upgraded connections need to allow multiple threads to access the
            // connection at the same time to enable blocking IO to be used when
            // NIO has been configured
            if (ka != null && ka.isUpgraded() &&
                    SocketStatus.OPEN_WRITE == status) {
                synchronized (ka.getWriteThreadLock()) {
                    doRun(key, ka);
                }
            } else {
                synchronized (socket) {
                    doRun(key, ka);
                }
            }
        }

        private void doRun(SelectionKey key, KeyAttachment ka) {
            try {
                int handshake = -1;

                try {
                    if (key != null) {
                        // For STOP there is no point trying to handshake as the
                        // Poller has been stopped.
                        if (socket.isHandshakeComplete() ||
                                status == SocketStatus.STOP) {
                            handshake = 0;
                        } else {
                            handshake = socket.handshake(
                                    key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            status = SocketStatus.OPEN_READ;
                        }
                    }
                }catch ( IOException x ) {
                    handshake = -1;
                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
                }catch ( CancelledKeyException ckx ) {
                    handshake = -1;
                }
                if ( handshake == 0 ) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if (status == null) {
                        // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求
                        state = handler.process(ka, SocketStatus.OPEN_READ);
                    } else {
                        state = handler.process(ka, status);
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket and pool
                        try {
                            close(ka, socket, key, SocketStatus.ERROR);
                        } catch ( Exception x ) {
                            log.error("",x);
                        }
                    }
                } else if (handshake == -1 ) {
                    close(ka, socket, key, SocketStatus.DISCONNECT);
                } else {
                    ka.getPoller().add(socket, handshake);
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key, null, false);
            } catch (OutOfMemoryError oom) {
                try {
                    oomParachuteData = null;
                    log.error("", oom);
                    if (socket != null) {
                        socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
                    }
                    releaseCaches();
                }catch ( Throwable oomt ) {
                    try {
                        System.err.println(oomParachuteMsg);
                        oomt.printStackTrace();
                    }catch (Throwable letsHopeWeDontGetHere){
                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                    }
                }
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            }catch ( Throwable t ) {
                log.error("",t);
                if (socket != null) {
                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }
            } finally {
                socket = null;
                status = null;
                //return to cache
                if (running && !paused) {
                    processorCache.offer(this);
                }
            }
        }

        private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
                SocketStatus socketStatus) {
		...
        }
    }

Видно, что SocketProcessor находит подходящий обработчик Handler для окончательной обработки преобразования сокета.

Основной процесс NioEndpoint можно обобщить следующей картинкой:

Acceptor и Poller — массивы потоков, Worker — пул потоков ( Executor )