Применение NIO в Jetty

Java задняя часть

Введение

Будучи ветераном любовной сцены на протяжении многих лет, Сяньчэн обычно использует следующие стратегии после знакомства с девушкой. (если предположить, что конституция影流之主1024 поколение потомка и осталась только способность клона)

  • Сохраните девушку в очереди и время от времени отправляйте WeChat, чтобы дразнить ее.Если у вас есть намерение, Сяньчэн будет использовать способность клонирования, чтобы создать еще одну.宪程подобрать

  • Чэн Сиань собственной сестры выполняет операцию, и если во время своей фантазии новая сестра, мы должны делать, то сестра на собственный аватар宪程Перейдите к процессу голосования, и Сяньчэн попытается сделать клон после того, как добьет девушку.宪程Задача опроса берется на себя.Ведь инициатива всегда у главного органа.А что если его не взять на себя?Я могу только стать клоном.Ведь клон в это время.宪程Взял на себя работу онтологии, в некотором смысле стал本体.

Модель причала NIO

Перед чтением рекомендуется понять следующую модель Tomcat NIO, без сравнения нет вреда, вы найдете интересный момент модели Jetty NIO.

Обзор

Если у вас есть достаточно времени, я предлагаю вам прочитать приложение непосредственно, чтобы узнать, как отлаживать функции Jetty NIO.

Поскольку вы хотите понять модель NIO Jetty, с точки зрения потоков ее можно разделить на следующие категории.

  • 空闲线程Эта роль превратится в поток ввода-вывода или поток опроса в соответствии с задачами, отправленными в пул потоков.

  • Acceptor线程Эта роль в основном отвечает за получение подключений от клиентов и их инкапсуляцию. Выберите селектор, чтобы отправить эту задачу.

  • 轮询线程Эта роль в основном несет ответственность за опрашивающиеся события и обработку задач, представленных этой роли другими ролями. Кроме того, эта роль может передавать задачи опроса к другим потокам в соответствии с установленной политикой и возвращает их в пул резьбы после выполнения ввода-вывода. Задачи. Станьте空闲线程

Основные задействованные классы:

  • ConnectorЭта роль в основном отвечает за запуск и координацию различных компонентов модели JettyNIO.

  • SelectorManagerЭта роль в основномManagedSelectorДля управления вы можете использовать этот класс для взаимодействия с Selector

  • ManagedSelectorИнкапсулирует собственный JDKselector, и обеспечить внешнийselectorВнутренние классы, интерфейсы и методы, выполняющие операции

фокусВсе потоки совместно используют пул потоков

Connector

ключевой классorg.eclipse.jetty.server.ServerConnector

Connector — это коннектор, представляющий собой абстракцию Jetty для модели сетевого ввода-вывода и в основном отвечающий за сборку и запуск компонентов, необходимых в модели Jetty NIO. Поэтому основное внимание мы уделяем его реализации, т.ServerConnectorначальство.

Для инициализации коннектора нам необходимо предоставить ему следующие ключевые параметры (параметры, не относящиеся к данной статье, скрыты, кому интересно, разберутся сами)

  • Получение нового соединения для выполнения, пул резьбы I / O, событие опроса задач
  • Пул объектов ByteBuffer, пул может быть объектом回收а также提供ByteBuffer используется потоками ввода/вывода
  • ответственный за исполнениеacceptколичество потоков операций
  • Ответственный за выполнение задач опросаselectorколичество потоков

Однако большая часть работы по инициализации неServerConnector, но в его родительском классе, поэтому обратим внимание наorg.eclipse.jetty.server.AbstractConnector

Код инициализации этого класса выглядит следующим образом, который в основном выполняет следующую работу.

  • Проверьте, указан ли пул потоков, если нет, поделитесь пулом потоков с сервером.
  • Проверьте, указан ли ByteBufferPool, если не используйте ArrayByteBuffer
  • Проверьте, установлено ли количество акцепторов, если нет, следуйтеmax(1,min(4,CPU核心数÷8))Расчет, то есть номер по умолчанию Acceptor по крайней мере один из четырех

Представьте, если сервесокархнал был установлен на блокировку, чтобы несколько ниток могли одновременно выполнять принимать операции, затемБольшинство потоков будут заблокированы в большинстве случаев, а восстановление потока из состояния блокировки связано с переключением контекста потока, поэтому чем больше потоков Acceptor, тем лучше.

    public AbstractConnector(
        Server server,
        Executor executor,
        Scheduler scheduler,
        ByteBufferPool pool,
        int acceptors,
        ConnectionFactory... factories)
    {
        _server = server;
        //检查是否设置线程池,如果没有则使用Server的
        _executor = executor != null ? executor : _server.getThreadPool();
        if (scheduler == null)
            scheduler = _server.getBean(Scheduler.class);
        _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
        
        // 检查是否指定ByteBufferPool,如果没有则自己创建一个
        if (pool == null)
            pool = _server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
        // 将这些对象交给Jetty统一管理(不在本文讨论范围内,不展开)
        addBean(_server, false);
        addBean(_executor);
        if (executor == null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // ConnectionFactory主要使用来处理对应的HTTP协议
        for (ConnectionFactory factory : factories)
        {
            addConnectionFactory(factory);
        }
        // 如果未指定Acceptor的数量则根据CPU核数执行计算
        int cores = ProcessorUtils.availableProcessors();
        if (acceptors < 0)
           //根据此式可以推出Acceptor数量最大是4最小是1
            acceptors = Math.max(1, Math.min(4, cores / 8));
        // Acceptor数量大于CPU核心数
        // 将会引起大量的线程陷入阻塞状态
        // 没有东西可以accept不就阻塞了吗
        // 而要激活阻塞的线程则需要切换线程上下文会引起性能的浪费
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
        _acceptors = new Thread[acceptors];
    }

Как показано ниже, мой компьютер4核心i5CPU, то поток Acceptor по умолчанию должен иметь только один

4核心CPU
После запуска вашего Jetty мы можем использовать JConsole для проверки
Как видите, потоки, начинающиеся с qtp, используются для пула потоков NIO, а один из потоков Acceptor блокируется в методе accept().

Acceptor

Акцептор определяется вAbstractConnectorВо внутреннем классе его основная работа непрерывно вызывает метод accept, реализованный в подклассе, то есть реализация приема соединения откладывается до подкласса.

Код выглядит следующим образом, вы можете узнать много трюков, если вы не хотите читать код, краткое изложение выглядит следующим образом.

  • Получить поток, выполняющий текущий код, дать ему имя, см. скриншот JConsole в предыдущем разделе.
  • Настройте приоритет потока Acceptor на самый высокий (конечно, это может не сработать, это зависит от того, будет ли операционная система вас игнорировать)
  • Дождитесь сигнала освобождения от других потоков, прежде чем выполнять операцию Accept.
  • Непрерывный цикл через операцию принятия
      public void run()
        {
           // 给线程起给名字
            final Thread thread = Thread.currentThread();
            String name = thread.getName();
            _name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
            thread.setName(_name);
            // 设置优先级
            int priority = thread.getPriority();
            if (_acceptorPriorityDelta != 0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
            // 保存对此线程的引用
            _acceptors[_id] = thread;
            
            try
            {
                while (isRunning())
                {
                    // 加锁,等待来自其他线程的信号说可以开始干活了
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e)
                    {
                        continue;
                    }

                    try
                    {
                       //调用子类的accept方法
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
               // 发生异常了,则将线程的名称以及优先级调回原来的值
                thread.setName(name);
                if (_acceptorPriorityDelta != 0)
                    thread.setPriority(priority);
                
                //释放引用
                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping = _stopping;
                if (stopping != null)
                    stopping.countDown();
            }
        }

в подклассеServerConnectorсередина,acceptВ основном делать следующее

  • от阻塞Получено в виде подключения от клиента
  • Установите клиентSocketChannelза非阻塞模式禁用nagle算法
  • Передайте его SelectorManager для обработки, этот классSocketChannelупакован вAcceptмероприятие, передать轮询线程иметь дело сServerConnectorкод в
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
            accepted(channel);
        }
    }

    private void accepted(SocketChannel channel) throws IOException
    {
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket); // socket.setTcpNoDelay(true);
        _manager.accept(channel);
    }

Код, который, наконец, вызывается в SelectorManager

    public void accept(SelectableChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector();
        selector.submit(selector.new Accept(channel, attachment));
    }

ветка опроса

轮询线程В основном отвечает за опрос событий ввода-вывода и обработку задач, отправленных другими потоками в этот поток. и мы можем轮询线程Укажите стратегию исполнения, позже мы сможем увидеть, как повлияет стратегия исполнения轮询线程поведение.

В первую очередь нам нужно уточнить, какие классы будут участвовать в работе потока опроса, то есть мы должны сначала уточнить цепочку вызовов потока опроса.

Поскольку часть трассировки стека FIG отмечена красным прямоугольником, поток опроса должен участвовать в структуре основного стека, как показано ниже.

  • ManagedSelectorЭтот класс в основном инкапсулирует JDK.selectorкласс и предоставить методы и классы для работы с этим селектором
  • EatWhatYouKillЭтот класс является стратегией выполнения потока опроса.Этот класс будет постоянно вызывать метод SelectorProducer.produce для создания инкапсулированной задачи ввода-вывода и определять способ выполнения задачи ввода-вывода в соответствии с ее стратегией.
  • SelectorProducerЭтот классManagedSelectorВнутренний класс, реализующий стратегию выполнения потокаExecutionStrategy.ProducerИнтерфейс, предназначенный для создания задач ввода-вывода для обработки потоков опроса.

ManagedSelector

Родной JDK JettySelectorинкапсуляция класса становитсяManagedSelector, основная функция этого класса — раскрыть и инкапсулировать егоselectorИнтерфейсы и внутренние классы, выполняющие операции. Его ключевые методы и внутренние классы следующие:

Интерфейс SelectorUpdateесли хотитеManagedSelectorудалосьselectorОбновления (например, выполнение регистрации интересующих событий ввода-вывода) могут реализовывать этот интерфейс, который определяется следующим образом.

    public interface SelectorUpdate
    {
        void update(Selector selector);
    }

Отправить методЭтот метод используется в основном для наружногоSelectorUpdateОтправить в поток опроса для выполненияSelectorОперация обновления, простыми словами, этот метод будет выполнять следующие действия.

  • Поставить в очередь событие обновления
  • Проверьте, делает ли Selector выбор, если да, то разбудите его, чтобы он вернулся из блокировки, чтобы мы могли его обновить.
    public void submit(SelectorUpdate update)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}", update, this);

        Selector selector = null;
        synchronized (ManagedSelector.this)
        {
            //加事件加入处理队列
            _updates.offer(update);
            //检查是否正在轮询,如果正在轮询,则会执行唤醒操作
            //因此在此处需要将selecting置为false
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }

        if (selector != null)
        {
           //执行唤醒操作,以便对selector执行更新操作
            if (LOG.isDebugEnabled())
                LOG.debug("Wakeup on submit {}", this);
            selector.wakeup();
        }
    }

SelectorProducer

SelectorProducerдаManagedSelectorВнутренний класс, реализующий стратегию выполнения потока опроса.ExecutionStrategy.Producerинтерфейс

    interface Producer
    {
        // 返回一个Runnable任务供轮询线程执行
        Runnable produce();
    }

следовательноSelectorProducerнужно продолжать звонитьselectorЧтобы опросить, чтобы увидеть, есть ли новые события ввода-вывода для обработки, в дополнение к этому необходимо обрабатывать внешние классы дляManagedSelectorпозвонивsubmitметод представленSelectorUpdateЗадача

Предоставляется классу стратегии выполнения потока.produceКод метода выглядит следующим образом, в основном он выполняет следующие задачи

  • Выполнять цикл до тех пор, пока интересующая задача не будет опрошена (за раз возвращается только одна задача, событие опроса будет сохранено для следующего использования)
  • Обрабатывать задачи, представленные ему внешними классами (вызовprocessUpdates)
  • Обновите интересующие события для клиента SocketChannel
        @Override
        public Runnable produce()
        {
            while (true)
            {
                //处理之前查询到事件
                Runnable task = processSelected();
                if (task != null)
                    return task;
                //处理外部类所提交的update任务
                //该方法最终会导致提交的SelectorUpdate.update被调用
                processUpdates();
                //此方法的调用可能会
                //导致客户端SocketChannel感兴趣的事件发生变更
                updateKeys();
                //执行select操作,并将查询到事件保存起来
                if (!select())
                    return null;
            }
        }

processUpdatesЭтот метод в основном обрабатывает представление внешних классовSelectorUpdateМиссия, приведенная копированием очень умным, чтобы избежать вопросов параллелизма

        private void processUpdates()
        {
            synchronized (ManagedSelector.this)
            {
                //倒腾数据,将要处理队列的引用保存
                //到另一个变量上,原有的引用可以继续对外提供服务
                //整个数据倒腾过程非常短,性能影响较小
                Deque<SelectorUpdate> updates = _updates;
                _updates = _updateable;
                _updateable = updates;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updateable {}", _updateable.size());
            //遍历事件队列,处理update方法
            for (SelectorUpdate update : _updateable)
            {
                if (_selector == null)
                    break;
                try
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("update {}", update);
                    //调用事件的update方法,并传入selector
                    update.update(_selector);
                }
                catch (Throwable ex)
                {
                    LOG.warn(ex);
                }
            }
            _updateable.clear();

            Selector selector;
            int updates;
            //再次检查是否有新的事件被提交,如果有则执行唤醒操作
            synchronized (ManagedSelector.this)
            {
               //外部类提交的任务会保存到updates中
                updates = _updates.size();
                _selecting = updates == 0;
                selector = _selecting ? null : _selector;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updates {}", updates);

            if (selector != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("wakeup on updates {}", this);
                selector.wakeup();
            }
        }

select()Этот метод в основном выполняет операцию опроса и сохраняет опрашиваемые события для возврата в следующем цикле.В этом методе он показывает, как обрабатывает причал空轮询событие(空轮询Это означает, что когда селектор выполняет операцию выбора, он не запрашивает никаких событий, а возвращает результат.100%использование, сбой системы)

        private boolean select()
        {
            try
            {
                Selector selector = _selector;
                if (selector != null && selector.isOpen())
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
                    int selected = selector.select();
                    //没查询到事件, 空轮询事件处理
                    if (selected == 0)
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Selector {} woken with none selected", selector);
                        //如果线程被中断,并且标志位被设置了不在运行则执行推出逻辑
                        if (Thread.interrupted() && !isRunning())
                            throw new ClosedSelectorException();
                        //开启了此参数则立即执行一次select操作
                        if (FORCE_SELECT_NOW)
                            selected = selector.selectNow();
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());

                    int updates;
                    synchronized (ManagedSelector.this)
                    {
                        // 完成了select操作则设置标志位
                        _selecting = false;
                        updates = _updates.size();
                    }

                    _keys = selector.selectedKeys();
                    _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);

                    return true;
                }
            }
            catch (Throwable x)
            {
                _selector = null;
                if (isRunning())
                    LOG.warn(x);
                else
                {
                    LOG.warn(x.toString());
                    LOG.debug(x);
                }
                closeNoExceptions(_selector);
            }
            return false;
        }

В отличие от стратегии обработки пустого опроса Netty, стратегия обработки Jetty заключается в повторном выборе и немедленном возврате, но это, похоже, не решает ошибку пустого опроса.Детали вопроса

EatWhatYouKill

EatWhatYouKillВыполнение потока является стратегией, но также относится к стратегии Jetty по умолчанию, идея исходит из如果猎人杀死一只猎物,那么猎人就应该吃掉它(Если вы съели свежие креветки, вы哲学глубокий опыт), иначе говоря,轮询线程如果查询到一次I/O事件就应该直接处理它(помните вступление?)

P.S код ключаorg.eclipse.jetty.util.thread.strategy.EatWhatYouKill

Причина этого в том, что переключение потоков является трудоемкой операцией (относительно говоря), поэтому при этой стратегии опрашивающий поток A будет иметь следующую стратегию, если он получит событие

  • Если эта задача标志为非阻塞任务, то нить A будет立即执行эта задача

если задачаТип блокировки неизвестенили бытьотмечен как заблокированный

  • Если все потоки в пуле потоков находятся в繁忙состояние, отправьте его в пул потоков, чтобы дождаться выполнения

  • Если в пуле потоков есть бездействующий поток B, попробуйте назначить поток A ответственным轮询功может быть передан потоку B, если立即获取到线程BВ случае успеха поток А выполнит полученную задачу напрямую.После выполнения задачи поток А попытается夺回Задача опроса передается потоку B, если повторный захват не удается, он становится бездействующим потоком, ожидающим назначения задач. (Помните введение?)

  • Кроме того, поток A попытается выполнить задачу напрямую и не будет передавать работу опроса. (Код слишком длинный, извлекается только код ключа)

    case BLOCKING:
        synchronized (this)
        {
            if (_pending)
            {
                //轮询工作陷入了停滞,因此是IDLE状态
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }
            //tryExecute 如果立即分配到了线程则返回true
            //this的run方法也就是实现轮询线程核心的方法
            //因此此行代码相当于将轮询的工作转移给了其他线程
            else if (_tryExecutor.tryExecute(this))
            {
                _pending = true;
                //由于轮询工作的转移
                //因此当前轮询工作相当于陷入空闲状态
                //所以需要将此对象的状态至为IDLE
                //(轮询线程和当前线程使用同一个对象)
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }else
            {
               //前两者均不满足则将任务提交到线程池
                mode = Mode.PRODUCE_EXECUTE_CONSUME;
            }
        }
        break;

стратегия выполнения задачи

            case EXECUTE_PRODUCE_CONSUME:
                _epcMode.increment();
                //直接在当前线程调用
                runTask(task);

                // 尝试夺回轮询任务
                synchronized (this)
                {
                   // 如果State还处于空闲状态
                   // 说明
                   // 线程B还未开始执行轮询任务,可以直接夺回
                   // 如果线程B已经开始轮询
                   // 则选择离开
                    if (_state == State.IDLE)
                    {
                        // 返回true则继续轮询
                        return true;
                    }
                }
                //返回false则结束轮询任务,变为空闲线程
                return false;

Суммировать

По сравнению с Tomcat, дизайн Jetty более радикальный, более авантюристический, больше похож на дизайн Jetty с личной точки зрения, но с точки зрения бизнеса он все же более стабилен, в конце концов, это бизнес.Основной спрос и производительность Tomcat будут не будь слишком плох.

Модель Jetty NIO, разделенная по категориям потоков, показана на следующем рисунке.

  • AcceptorПоток отвечает за получение новых соединений от клиента, инкапсулирует их как событие и отправляет в поток опроса для обработки.
  • 轮询线程Обработка потока опроса отвечает за опрос событий ввода-вывода, а также должна обрабатывать данные, отправленные внешними потоками.selectorОбновите задачи, и в соответствии с установленной стратегией выполнения поток опроса может напрямую выполнять задачи ввода-вывода в этом потоке и передавать задачи опроса другим бездействующим потокам или выбирать незанятый поток для выполнения операций ввода-вывода.
  • I/O线程В основном отвечает за обработку операций ввода-вывода.

С точки зрения категории нити модель NiO-модели NIO относительно проста, но ее введенная речь опрос выполняет политику для изменения между идентичностью нити, благодаря этому присягу может напрямую опросить нить непосредственно для выполнения задач ввода-вывода. Расход производительности, вызванные переключателями контекста потоков и повышает производительность.

передача мысли

Переключение потоков требует затратJetty повышает производительность, напрямую выполняя задачи ввода-вывода в потоке опроса, чтобы уменьшить переключение контекста потока.Кроме того, мы также можем реализовать механизм сопрограммы для снижения стоимости переключения контекста потока (см. язык Go).

Резьба акцептора должна быть соответствующейЕсли ServerSocket установлен в режим блокировки, операция принятия приведет к блокировке потока, и поток будет переключаться вверх и вниз при возврате из метода принятия, поэтому чем больше, тем лучше.

Как отлаживать Jetty

Мы используем SpringBoot для отладки Jetty, поэтому нам нужноpom.xmlВвел Jetty, так как SpringBoot по умолчанию использует Tomcat, нам нужно заменить его, зависимости следующие.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>

Используемая версия SpringBoot — 2.2.0, а номер версии Jetty, от которой она зависит, — 9.4.20.

  • Если вы хотите понять, как работает Connector, обратите внимание на следующие классы.org.eclipse.jetty.server.ServerConnector

  • Если вы хотите понять, как Jetty NIO опрашивает и обрабатывает события, обратите внимание на следующие классыorg.eclipse.jetty.io.ManagedSelectorи в его внутреннем классеSelectorProducerизproduceПоставьте точку останова на методе, как показано на следующем рисунке, вы поймете, что происходило во время всего процесса опроса.

Щелкните правой кнопкой мыши красную точку и выберите Thread, чтобы избежать ситуации, когда точка останова не может быть введена, ведь мы отлаживаем многопоточную программу.

  • Если вы хотите понять стратегию выполнения потока, то обратите внимание на следующие классы (такой механизм выполнения более сложен, если вы хотите отладить все ситуации, лучше всего комбинировать определенные стратегии, например блокировку потока в коде контроллера и т. д.)org.eclipse.jetty.util.thread.strategy.EatWhatYouKill