【Серия NIO】—Модель реактора

Java задняя часть сервер Netty


Прежде чем начать, мы не будем слишком много рассказывать об использовании JavaNIO.Способов внедрения этого API в Интернете слишком много, и нет необходимости представлять его подробно.Мы предполагаем, что вы можете использовать использование NIO. умело. Это третья статья из серии NIO:

【Серия NIO】—— Секрет TCP

【Серия NIO】—Модель IO


Из предыдущего знакомства с моделью ввода-вывода в Unix я, должно быть, узнал о пяти моделях ввода-вывода. NIO в Java — это синхронный неблокирующий ввод-вывод.Что касается мультиплексирования ввода-вывода, в Java нет соответствующей модели ввода-вывода, но есть соответствующий режим программирования.Reactor — это режим, основанный на NIO для мультиплексирования. Эта статья объяснит шаблон Reactor со следующих точек зрения:

  1. что такое реактор

  2. Зачем его использовать, какую проблему он может решить

  3. Как его использовать, лучший способ

  4. Другие шаблоны обработки событий


1. Что такое реактор

Относительно того, что такое реактор, давайте сначала заглянем в вики:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

Из приведенного выше текста мы можем увидеть следующие ключевые моменты:

  1. обработка событий

  2. Может обрабатывать один или несколько входов

  3. Входное событие (событие) синхронно распространяется на соответствующий обработчик запросов (множественный) обработки путем мультиплексирования через обработчик служб.

Из введения в шаблон Reactor в POSA2 мы узнали, как обрабатываются Reactors:

  1. Синхронное ожидание прибытия нескольких источников событий (реализовано с помощью select())

  2. Демультиплексирование событий и назначение соответствующих сервисов событий для обработки.Эта диспетчеризация использует централизованную серверную обработку (диспетчеризацию)

  3. Разложенное событие и соответствующее приложение службы событий отделены от диспетчерской службы (обработчика).


О конструкции диаграммы классов OMT шаблона Reactor:


2. Зачем использовать Reactor

В общих сетевых службах, если каждый клиент поддерживает соединение с сервером входа. Затем сервер будет поддерживать несколько соединений с клиентом для связи с клиентом, чтения, записи, особенно для длинных сервисов, сколько есть c-концов, вам нужно поддерживать одно и то же соединение ввода-вывода на s-конце. Это большая нагрузка на сервер.

1. БИО

Например, мы используем BIO для поддержания соединения с клиентом:

// 主线程维护连接
  public void run() {
      try {
          while (true) {
              Socket socket = serverSocket.accept();
              //提交线程池处理
              executorService.submit(new Handler(socket));
          }
      } catch (Exception e) {
          e.printStackTrace();
      }
  }
​
  // 处理读写服务
  class Handler implements Runnable {
      public void run() {
          try {
              //获取Socket的输入流,接收数据
              BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
              String readData = buf.readLine();
              while (readData != null) {
                  readData = buf.readLine();
                  System.out.println(readData);
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
  }


Очевидно, чтобы избежать исчерпания ресурсов, мы используем пул потоков для обработки служб чтения и записи. Но есть еще очевидные недостатки этого:

  1. Синхронная блокировка ввода-вывода, блокировка чтения и записи, слишком долгое время ожидания потока

  2. При формулировании стратегии потока доступные ресурсы потока могут быть ограничены только в соответствии с количеством ЦП и не могут быть сформулированы в соответствии с количеством одновременных соединений, то есть соединения ограничены. В противном случае сложно гарантировать эффективность и справедливость клиентских запросов.

  3. Переключение контекста между несколькими потоками делает использование потоков неэффективным и непростым для расширения.

  4. Данные о состоянии и другие данные, которые должны быть согласованными, должны использовать управление параллельной синхронизацией.


2. НИО

Таким образом, есть другие способы справиться с этим лучше, мы можем использовать NIO для обработки этого базового механизма, поддерживаемого в NIO:

  1. Неблокирующий ввод-вывод для чтения и записи

  2. Распределение задач на основе событий ввода-вывода и поддержка мониторинга нескольких файловых систем одновременно.


Давайте посмотрим на реализацию связанных методов в NIO:

public NIOServer(int port) throws Exception {
      selector = Selector.open();
      serverSocket = ServerSocketChannel.open();
      serverSocket.socket().bind(new InetSocketAddress(port));
      serverSocket.configureBlocking(false);
      serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  }
​
  @Override
  public void run() {
      while (!Thread.interrupted()) {
          try {
              //阻塞等待事件
              selector.select();
              // 事件列表
              Set selected = selector.selectedKeys();
              Iterator it = selected.iterator();
              while (it.hasNext()) {
                  it.remove();
                  //分发事件
                  dispatch((SelectionKey) (it.next()));
              }
          } catch (Exception e) {
​
          }
      }
  }
​
  private void dispatch(SelectionKey key) throws Exception {
      if (key.isAcceptable()) {
          register(key);//新链接建立,注册
      } else if (key.isReadable()) {
          read(key);//读事件处理
      } else if (key.isWritable()) {
          wirete(key);//写事件处理
      }
  }
​
  private void register(SelectionKey key) throws Exception {
      ServerSocketChannel server = (ServerSocketChannel) key
              .channel();
      // 获得和客户端连接的通道
      SocketChannel channel = server.accept();
      channel.configureBlocking(false);
      //客户端通道注册到selector 上
      channel.register(this.selector, SelectionKey.OP_READ);
  }


Мы видим, что приведенный выше пример NIO имеет почти тень реактора.

  1. На основе селектора, управляемого событиями -> (поддерживает мониторинг нескольких socketChannels)

  2. Единый диспетчерский центр событий -> диспетчеризация

  3. Служба обработки событий -> чтение и запись


Фактически, NIO решил проблемы 1 и 2, обнаруженные в вышеупомянутом BIO, и количество одновременных клиентов сервера было улучшено.Он больше не ограничен одним клиентом и одним потоком, но один поток может поддерживать несколько клиентов ( selector Поддерживает прослушивание нескольких socketChannels).

Но это все еще не идеальный шаблон Reactor. Прежде всего, Reactor — это шаблон проектирования. Хороший шаблон должен поддерживать лучшую масштабируемость. Очевидно, вышеизложенное не поддерживает его. Кроме того, хороший шаблон Reactor должен обладать следующими характеристиками:

  1. Меньшее использование ресурсов, обычно не требуется один поток на клиента.

  2. Меньше накладных расходов, меньше переключения контекста и блокировки

  3. Возможность отслеживать статус сервера

  4. Возможность управления привязками обработчиков к событиям

Так как же должен выглядеть хороший шаблон Reactor?

3. Реактор

В приложении Java NIO для создания Reactor Pattern великий бог Дуг Ли (бесконечно почитаемый бог Java) в «Scalable IO in Java" дал хорошее объяснение. Мы используем три вида реакторов, введенных богами, чтобы представить их по отдельности.

Во-первых, мы определяем следующие три роли на основе режима обработки Reactor Pattern:

  • ReactorОтправка событий ввода/вывода соответствующему обработчику

  • AcceptorОбработка новых клиентских подключений и отправка запросов в цепочку обработчиков

  • HandlersВыполнять неблокирующие задачи чтения/записи


1. Однопоточная модель с одним реактором

Посмотрим, как реализован код:

/**
    * 等待事件到来,分发事件处理
    */
  class Reactor implements Runnable {
​
      private Reactor() throws Exception {
​
          SelectionKey sk =
                  serverSocket.register(selector,
                          SelectionKey.OP_ACCEPT);
          // attach Acceptor 处理新连接
          sk.attach(new Acceptor());
      }
​
      public void run() {
          try {
              while (!Thread.interrupted()) {
                  selector.select();
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分发事件处理
                      dispatch((SelectionKey) (it.next()));
                  }
              }
          } catch (IOException ex) {
              //do something
          }
      }
​
      void dispatch(SelectionKey k) {
          // 若是连接事件获取是acceptor
          // 若是IO读写事件获取是handler
          Runnable runnable = (Runnable) (k.attachment());
          if (runnable != null) {
              runnable.run();
          }
      }
​
  }
  /**
    * 连接事件就绪,处理连接事件
    */
  class Acceptor implements Runnable {
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  new Handler(c, selector);
              }
          } catch (Exception e) {
​
          }
      }
  }
  /**
    * 处理读写业务逻辑
    */
  class Handler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      public Handler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          process();
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          process();
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do something
      }
  }


Это самая базовая однопоточная модель Single-Reactor. Поток Reactor отвечает за демультиплексирование сокетов.После поступления нового соединения и запуска события соединения оно передается на обработку Acceptor, а события чтения и записи ввода-вывода передаются обработчику для обработки.

Основная задача Акцептора - построить обработчик.После получения SocketChannel, относящегося к клиенту, привязать его к соответствующему обработчику.После того, как соответствующий SocketChannel прочитает и запишет события, на основе распределения racotor, обработчик может его обработать ( все события IO привязаны к селектору Set, там Reactor-раздача).

Эта модель подходит для сценариев, в которых компоненты бизнес-процессов в цепочке процессоров могут быть завершены быстро. Однако эта однопоточная модель не может в полной мере использовать многоядерные ресурсы, поэтому на практике она мало используется.


2. Многопоточная модель Single Reactor

По сравнению с первым однопоточным режимом, после обработки бизнес-логики, то есть после получения событий чтения и записи ввода-вывода, он передается в пул потоков для обработки, что может снизить накладные расходы на производительность основного реактора, поэтому что основной реактор может быть более целенаправленным.Распределение событий работает, тем самым улучшая пропускную способность всего приложения.

Посмотрим на реализацию:

/**
    * 多线程处理读写业务逻辑
    */
  class MultiThreadHandler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      //多线程处理业务逻辑
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }


3. Многопоточная модель Multi-Reactor


По сравнению со второй моделью, третья модель делит Reactor на две части.

  1. mainReactor отвечает за мониторинг сокета сервера, который используется для обработки установления новых соединений, и регистрирует установленный socketChannel в subReactor.

  2. SubReactor поддерживает свой собственный селектор, разделяет события чтения и записи ввода-вывода на основе socketChannel, зарегистрированного mainReactor, считывает и записывает сетевые данные, обрабатывает бизнес-функции и отправляет их в пул рабочих потоков для завершения.


Посмотрим на реализацию:

/**
    * 多work 连接事件Acceptor,处理连接事件
    */
  class MultiWorkThreadAcceptor implements Runnable {
​
      // cpu线程数相同多work线程
      int workCount =Runtime.getRuntime().availableProcessors();
      SubReactor[] workThreadHandlers = new SubReactor[workCount];
      volatile int nextHandler = 0;
​
      public MultiWorkThreadAcceptor() {
          this.init();
      }
​
      public void init() {
          nextHandler = 0;
          for (int i = 0; i < workThreadHandlers.length; i++) {
              try {
                  workThreadHandlers[i] = new SubReactor();
              } catch (Exception e) {
              }
​
          }
      }
​
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  synchronized (c) {
                      // 顺序获取SubReactor,然后注册channel 
                      SubReactor work = workThreadHandlers[nextHandler];
                      work.registerChannel(c);
                      nextHandler++;
                      if (nextHandler >= workThreadHandlers.length) {
                          nextHandler = 0;
                      }
                  }
              }
          } catch (Exception e) {
          }
      }
  }
  /**
    * 多work线程处理读写业务逻辑
    */
  class SubReactor implements Runnable {
      final Selector mySelector;
​
      //多线程处理业务逻辑
      int workCount =Runtime.getRuntime().availableProcessors();
      ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
      public SubReactor() throws Exception {
          // 每个SubReactor 一个selector 
          this.mySelector = SelectorProvider.provider().openSelector();
      }
​
      /**
        * 注册chanel
        *
        * @param sc
        * @throws Exception
        */
      public void registerChannel(SocketChannel sc) throws Exception {
          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
      }
​
      @Override
      public void run() {
          while (true) {
              try {
              //每个SubReactor 自己做事件分派处理读写事件
                  selector.select();
                  Set<SelectionKey> keys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if (key.isReadable()) {
                          read();
                      } else if (key.isWritable()) {
                          write();
                      }
                  }
​
              } catch (Exception e) {
​
              }
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }
​


В третьей модели мы видим, что mainReactor в основном используется для обработки операции установления сетевого соединения ввода-вывода, которая обычно может обрабатываться одним потоком, в то время как subReactor в основном используется для взаимодействия данных и операций бизнес-обработки событий с установленным сокетом. , Число обычно равно количеству ЦП, и каждый subReactor обрабатывается округом.

В этой модели работа каждого модуля более специфична, связанность ниже, производительность и стабильность также значительно улучшены, а количество поддерживаемых одновременных клиентов может достигать миллионов.

Что касается применения этой модели, то уже используется много отличных мин, таких как мина и нетти. Третья форма вышеуказанного варианта, которая удаляет пул потоков, также является режимом по умолчанию для Netty NIO. В следующем разделе мы сосредоточимся на архитектурных шаблонах netty.


В-четвертых, режим обработки событий

существуетDouglas SchmidtВ его шедевре «POSA2» есть введение в режим обработки событий, в котором есть четыре режима обработки событий:

  1. Reactor

  2. Proactor

  3. Asynchronous Completion Token

  4. Acceptor-Connector

1.Proactor

Представлено в этой статьеReactorявляется одним из них, иProactorОбщая структура реактора аналогична способу обработки реактора, разница в том, чтоProactorОн реализован в виде асинхронного неблокирующего ввода-вывода.Чтение и запись данных обрабатываются асинхронно без пользовательских потоков.Сервисная программа больше ориентирована на обработку бизнес-событий, а не на блокировку ввода-вывода.

2.Asynchronous Completion Token

Проще говоря,ACTЭто ответ на то, что приложение асинхронно вызывает операцию службы и обрабатывает соответствующее событие завершения службы. Из буквального значения токена мы, вероятно, можем понять, что это своего рода сохранение и передача состояния.

Например, приложениям обычно необходимо вызывать сторонние службы. Как правило, поступают запросы бизнес-потоков. Когда требуются сторонние ресурсы, сторонние запросы инициируются синхронно. Чтобы повысить производительность приложений, необходимо инициировать запросы. асинхронно, но для асинхронных запросов после поступления данных контекст и контекстная информация нашего приложения изменились, и вы не можете с этим справиться.

ACTРешение заключается в решении этой проблемы.Токен используется для записи информации перед асинхронной отправкой, и отправки ее получателю.Когда получатель ответит, он принесет этот токен, после чего сценарий вызова сервиса может быть восстановлен.


На рисунке выше мы видим, что на этапе обработки клиента клиент может продолжать обрабатывать другую бизнес-логику, не находясь в состоянии блокировки. Когда служба вернется, информация о маркере будет доставлена.​


3.Acceptor-Connector

Acceptor-ConnectorЭто комбинация Reactor, и его также можно рассматривать как вариант.Он очень похож на третью реализацию Reactor, представленную выше, но принципиально отличается.

Acceptor-ConnectorШаблон заключается в разделении соединения и инициализации одноранговой службы в сети, чтобы установление соединения и служба в системе были разделены и разъединены после инициализации службы. Соединитель

Инициатива
Устанавливает соединение с компонентом удаленного получателя и инициализирует обработчик службы для обработки данных, которыми обмениваются при соединении. Так же и получатель
Пассивно
Ожидает запроса на подключение от удаленного соединителя, устанавливает подключение при поступлении такого запроса и инициализирует обработчик службы для обработки данных, которыми обмениваются при подключении. Затем инициализированный служебный процессор выполняет специфичную для приложения обработку и обменивается данными через соединения, установленные компонентами соединителя и приемника.

Преимущества этого:

  1. В целом, политики установления соединений и инициализации служб меняются гораздо реже, чем реализации служб приложений и протоколы связи.

  2. Легко добавляйте новые типы служб, новые реализации служб и новые протоколы связи, не затрагивая существующее программное обеспечение для установления соединений и инициализации служб. Например, принимается протокол связи IPX/SPX или протокол TCP.

  3. Разделение роли соединения и роли связи, роль соединения только инициирует соединение, а не принимает соединение. Коммуникационная роль касается только взаимодействия с данными.

  4. Защищает программистов от отсутствия безопасности типов низкоуровневых API-интерфейсов сетевого программирования (таких как сокеты или TLI). Отношения развития бизнеса, лежащие в основе коммуникации


Цитировать:

woohoo.kuqin.com/ace-2002-12…

Woohoo. Электрическое тепло. Вандербильт. Квота/%7E Шмидт/…

гы. В это время. Освего. Квота/Вход/Введение продукта…


Для получения дополнительных знаний об архитектуре, пожалуйста, обратите внимание на мой публичный аккаунт, большой код ожидания (cool_wier)