Серия Dead Tomcat (2) — Анализ исходного кода EndPoint

Java

Серия Dead Tomcat (2) — Анализ исходного кода EndPoint

В предыдущем разделе мы описали общую архитектуру Tomcat, мы знаем, что Tomcat разделен на два основных компонента: коннектор и контейнер. И о чем мы поговорим на этот разEndPointКомпоненты принадлежат разъему. Это конечная точка связи, которая отвечает за внешнюю реализацию протокола TCP/IP.EndPointЭто интерфейс, и его конкретный класс реализацииAbstractEndpointAbstractEndpointКонкретный класс реализации имеетAprEndpoint,Nio2Endpoint,NioEndpoint.

  • AprEndpoint: В соответствии с режимом APR простое понимание заключается в том, чтобы решить проблему асинхронного ввода-вывода на уровне операционной системы и значительно улучшить производительность обработки и ответа сервера. Но включение этого режима требует установки некоторых других зависимостей.
  • Nio2Endpoint: Используйте код для реализации асинхронного ввода-вывода.
  • NioEndpoint: Используя JAVA NIO для достижения неблокирующего ввода-вывода, Tomcat запускается по умолчанию с этим, и это также является предметом нашего разговора.

Важные компоненты в NioEndpoint

мы знаемNioEndpointПринцип все еще для использования мультиплексора Linux, и в мультиплексоре это просто два шага.

  1. Создайте селектор, зарегистрируйте в нем различные каналы, а затем вызовите метод select, чтобы дождаться появления интересных событий в канале.
  2. Если происходит что-то интересное, например событие чтения, то информация считывается из канала.

а такжеNioEndpointДля достижения вышеуказанных двух шагов используются пять компонентов. Эти пять компонентов являютсяLimitLatch,Acceptor,Poller,SocketProcessor,Executor

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile LimitLatch connectionLimitLatch = null;
/**
 * The socket pollers. 
 */
private Poller[] pollers = null;

内部类

SocketProcessor

/**
 * External Executor based thread pool.
 */
private Executor executor = null;

Мы можем видеть эти пять компонентов, определенных в коде. Что именно делают эти пять компонентов?

  • LimitLatch: Контроллер соединений, отвечающий за управление максимальным количеством соединений.
  • Acceptor: отвечает за получение новых соединений и последующий возвратChannelВозражатьPoller
  • Poller: Вы можете думать об этом как о NIOSelector, ответственный за контрольChannelстатус
  • SocketProcessor: можно рассматривать как инкапсулированный класс задач
  • Executor: собственный расширенный пул потоков Tomcat для выполнения классов задач.

Простым представлением диаграммы является следующее соотношение

Далее рассмотрим код ключа в каждом компоненте отдельно

LimitLatch

Мы сказали вышеLimitLatchОн в основном используется для управления максимальным количеством соединений, которые может получить Tomcat.Если это соединение превышено, Tomcat заблокирует поток соединения и будет ждать, пока другие соединения не будут освобождены, прежде чем использовать это соединение. ТакLimitLatchКак это делается? мы можем видетьLimitLatchэтот класс


public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    //当前连接数
    private final AtomicLong count;
    //最大连接数
    private volatile long limit;
    private volatile boolean released = false;
}

Мы видим, что это реализовано внутриAbstractQueuedSynchronizer, AQS на самом деле является платформой, и классы, которые ее реализуют, могут настраивать, когда поток управления приостанавливается и освобождается.limitПараметр представляет собой максимальное количество подключений для управления. Мы видим, чтоAbstractEndpointперечислитьLimitLatchизcountUpOrAwaitметод, чтобы определить, может ли соединение быть получено.

    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

Как AQS узнает, когда нужно заблокировать поток? То есть не получается подключиться? Пользователь должен реализовать этоAbstractQueuedSynchronizerВы сами определяете, когда соединение установлено и когда соединение разорвано. Вы можете видеть, что класс Sync был переписан.tryAcquireSharedа такжеtryReleaseSharedметод. существуетtryAcquireSharedМетод определяет, что как только текущее количество подключений превысит установленное максимальное количество подключений, он вернет-1Указывает, что этот поток помещается в очередь AQS для ожидания.

Acceptor

Acceptorполучает соединения, мы можем видетьAcceptorДостигнутоRunnableинтерфейс, то где будет открыт новый поток для выполненияAcceptorА как насчет метода запуска? существуетAbstractEndpointизstartAcceptorThreadsметод.

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

Вы можете видеть здесь, что вы можете настроить несколькоAcceptor, по умолчанию один. И порт может соответствовать только одномуServerSocketChannel, то этоServerSocketChannelГде его инициализировать? мы можем видеть вAcceptor<U> acceptor = new Acceptor<>(this);В этом предложении это передано, тогда это должно быть сделаноEndpointИнициализированное компонентом соединение. существуетNioEndpointизinitServerSocketСоединение инициализируется в методе.

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

Здесь мы можем видеть две вещи

  1. Второй параметр в методе привязки указывает длину очереди ожидания операционной системы, то есть когда Tomcat больше не принимает соединения (достигнуто максимальное установленное количество соединений), но все еще может принимать соединения на уровне операционной системы. Информация о соединении помещается в очередь ожидания, затем этим параметром задается размер очереди.
  2. ServerSocketChannelОн установлен в режим блокировки, что означает, что соединения принимаются в режиме блокировки. Могут быть сомнения. Разве канал не установлен в неблокирующий режим в обычном программировании NIO? Объясните здесь, если он установлен в неблокирующий режим, то вы должны установитьSelectorПостоянно опрашивая, но принимая соединения, нужно заблокировать только один канал.

Здесь следует отметить одну вещь: каждыйAcceptorв созданииPollerEventобъект вставленPollerСлучайно взято из очередиPollerобъект, конкретный код можно увидеть следующим образом, поэтомуPollerсерединаQueueобъект настроен наSynchronizedQueue<PollerEvent>, потому что может быть больше одногоAcceptorк этому одновременноPollerпоставить в очередьPollerEventобъект.

public Poller getPoller0() {
    if (pollerThreadCount == 1) {
        return pollers[0];
    } else {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
}

Что такое подключение на уровне ОС? При трехстороннем рукопожатии TCP система обычно поддерживает две очереди для каждого сокета в состоянии LISTEN, одна из которых представляет собой полусвязанную очередь (SYN): эти соединения получили SYN от клиента, а другая представляет собой полностью подключенную очередь. (ACCEPT): ссылка получила ACK от клиента, завершила трехстороннее рукопожатие и ожидает, пока приложение вызовет метод accept, чтобы забрать его.

всеAcceptorподелиться этим одним соединением, вAcceptorизrunметод, поместите какой-нибудь важный код.

 @Override
    public void run() {
        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {
            try {
                //如果到了最大连接数,线程等待
                endpoint.countUpOrAwaitConnection();
                U socket = null;
                try {
                    //调用accept方法获得一个连接
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // 出异常以后当前连接数减掉1
                    endpoint.countDownConnection();
                }
                // 配置Socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
    }

Внутри мы можем получить две точки

  1. Среда выполнения сначала определит, достигнуто ли максимальное количество соединений, и если да, то она заблокирует поток для ожидания, и вызов будет завершен.LimitLatchКомпонент оценивается.
  2. Самое главное настроить шаг сокета, даendpoint.setSocketOptions(socket)этот код
 protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            // 设置Socket为非阻塞模式,供Poller调用
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            //注册ChannelEvent,其实是将ChannelEvent放入到队列中,然后Poller从队列中取
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

На самом деле, главное, чтобыAcceptorсPollerBinding, затем два компонента взаимодействуют через очередь, каждый опросчик поддерживаетSynchronizedQueueочередь,ChannelEventпоставить в очередь, тоPollerУдалить события из очереди для потребления.

Poller

Мы видим, чтоPollerдаNioEndpoint, который также реализует внутренний классRunnableInterface вы можете видеть, что Quene и Selector поддерживаются в своем классе, определенном следующим образом. Так что по существуPollerто естьSelector.

private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

Основное внимание уделяется его методу запуска, здесь был удален некоторый код, и показаны только важные.

  @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                boolean hasEvents = false;
                try {
                    if (!close) {
                        //查看是否有连接进来,如果有就将Channel注册进Selector中
                        hasEvents = events();
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

главное позвонитьevents()Метод заключается в том, чтобы постоянно проверять, есть ли в очередиPollereventсобытие, если оно есть, выньте его и положите внутрьChannelВозьмите и зарегистрируйтесь вSelector, а затем непрерывно опрашивает все зарегистрированныеChannelПроверьте, происходит ли событие.

SocketProcessor

мы знаемPollerголосованиеChannelКогда происходит событие, оно вызывается для инкапсуляции события, а затем передает его пулу потоков для выполнения. Тогда класс-оболочкаSocketProcessor. А мы открываем этот класс и видим, что он тоже реализованRunnableИнтерфейс, используемый для определения пула потоковExecutorЗадача, выполняемая потоком. Так вот какChannelПреобразуйте поток байтов в Tomcat в то, что нужно TomcatServletRequestЧто с объектом? На самом деле звонитHttp11Processorдля преобразования потоков байтов в объекты.

Executor

ExecutorПо сути, это пул потоков кастомизированной версии Tomcat. Мы можем посмотреть на определение его класса и обнаружить, что он фактически расширяет пул потоков Java.

public interface Executor extends java.util.concurrent.Executor, Lifecycle

Двумя наиболее важными параметрами в пуле потоков являются количество основных потоков и максимальное количество потоков.Поток выполнения обычного пула потоков Java выглядит следующим образом.

  1. Если текущий поток меньше числа основных потоков, то задача создаст поток.
  2. Если текущий поток больше, чем количество основных потоков, задача снова помещается в очередь задач. Все потоки захватывают задачи.
  3. Если очередь заполнена, начните создавать временные потоки.
  4. Если общее количество потоков достигает максимального количества потоков и очередь заполнена, возникает исключение.

Но в пользовательском пуле потоков Tomcat все по-другому, переписавexecuteМетод реализует собственную логику обработки задач.

  1. Если текущий поток меньше числа основных потоков, то задача создаст поток.
  2. Если текущий поток больше, чем количество основных потоков, задача снова помещается в очередь задач. Все потоки захватывают задачи.
  3. Если очередь заполнена, начните создавать временные потоки.
  4. Если общее количество потоков достигает максимального числа потоков, снова получите очередь задач и попробуйте снова добавить задачу в очередь.
  5. Если в это время он все еще заполнен, создается исключение.

Разница заключается в четвертом шаге.Стратегия обработки собственного пула потоков состоит в том, чтобы генерировать исключение до тех пор, пока текущее число потоков больше, чем максимальное число потоков, в то время как Tomcat пытается повторить попытку, если текущее число потоков превышает максимальное количество потоков. Если он по-прежнему заполнен, будет выдано исключение. Ниже приведен настраиваемый пул потоков.executeлогика исполнения.

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if (super.getQueue() instanceof TaskQueue) {
            //获得任务队列
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }

    }
}

В коде мы видим, что есть такое предложениеsubmittedCount.incrementAndGet();, а зачем эта фраза? Мы можем посмотреть на определение этого параметра. Проще говоря, этот параметр определяет количество задач, отправленных в пул потоков, но еще не выполненных.

/**
 * The number of tasks submitted but not yet finished. This includes tasks
 * in the queue and tasks that have been handed to a worker thread but the
 * latter did not start executing the task yet.
 * This number is always greater or equal to {@link #getActiveCount()}.
 */
private final AtomicInteger submittedCount = new AtomicInteger(0);

Почему такой параметр? Мы знаем, что пользовательская очередь наследуетсяLinkedBlockingQueueLinkedBlockingQueueПо умолчанию очереди безграничны. Итак, мы передаем параметр,maxQueueSizeк построенной очереди. Однако очередь задач Tomcat по умолчанию не ограничена, поэтому будет проблема: если текущий поток достигнет числа основных потоков, он начнет добавлять задачи в очередь, и всегда будет добавляться успешно. Тогда новые темы создаваться не будут. Итак, при каких обстоятельствах следует создавать новый поток?

Существует два места для создания нового потока в пуле потоков: во-первых, когда поток меньше основного потока, задача создает поток. Другой перегружен основными потоками, а очередь задач заполнена, создаются временные потоки.

Итак, как указать, заполнена ли очередь задач? Если задана максимальная длина очереди, то это, конечно, хорошо, но Tomcat не ставит ее по умолчанию, поэтому по умолчанию она не ограничена. Итак, ТомкэтTaskQueueнаследоватьLinkedBlockingQueue, переписаноofferметод, который определяет, когда возвращать false.

@Override
public boolean offer(Runnable o) {
    if (parent==null) return super.offer(o);
    //如果当前线程数等于最大线程数,此时不能创建新线程,只能添加进任务队列中
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //如果已提交但是未完成的任务数小于等于当前线程数,说明能处理过来,就放入队列中
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //到这一步说明,已提交但是未完成的任务数大于当前线程数,如果当前线程数小于最大线程数,就返回false新建线程
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    return super.offer(o);
}

ЭтоsubmittedCountЦель состоит в том, чтобы дать пулу потоков возможность создавать новые потоки, когда длина очереди задач бесконечна.

Суммировать

Часть вышеприведенных знаний подытожена просмотром подробного разбора Tomcat г-ном Ли Хаошуаном, а затем объединена с исходным кодом, чтобы понять его.Когда я впервые прочитал статью, я почувствовал, что понял ее, но когда я пошел поглубже в исходники,нашел опять.не понимаю. Итак, если знание только видно, но не используется, тогда знание никогда не будет вашим. Изучение небольшого исходного кода соединителя Tomcat в дополнение к практическому применению некоторых общих знаний, таких как AQS, применение блокировок, точки, которые необходимо учитывать в пользовательских пулах потоков, применение NIO и т. д. Есть также общее обучение дизайн-мышлению, модульный дизайн, который очень похож на современные микросервисы.Функциональная точка разделена на несколько модулей, чтобы ее можно было легко заменить или обновить в будущем.

Прошлые статьи

Как поставить точку останова при отладке исходного кода Tomcat

Серия Dead Tomcat (1) - общая архитектура

Странная поездка в поисках вопросов StackOverflowError

Простая структура RPC голыми руками

Простой RPC-фреймворк голыми руками (2) — трансформация проекта

Простой IOC голыми руками