Модель многопоточности Dubbo и стратегия планирования

Dubbo

1. Сервисный вызов

Сначала потребитель услуг инициирует удаленный вызов через прокси-объект Proxy, а затем отправляет закодированный запрос на сетевой уровень поставщика услуг через клиента сети Клиент, то есть Сервер. Первое, что делает сервер после получения запроса — декодирует пакет. Затем декодированный запрос отправляется диспетчеру Dispatcher, а затем диспетчер отправляет запрос в указанный пул потоков, и, наконец, пул потоков вызывает конкретную службу. Это процесс отправки и получения запроса на удаленный вызов.

Итак, как запрос отправляется в dubbo? И как выглядит модель потоков?

2. Разделение потоков ввода-вывода и бизнес-потоков

  • Если логика обработки событий может быть завершена быстро и не инициируются новые запросы ввода-вывода, например просто запись идентификатора в память, быстрее обрабатывать непосредственно в потоке ввода-вывода, поскольку планирование пула потоков сокращается.

  • Однако, если логика обработки событий медленная или если необходимо инициировать новый запрос ввода-вывода, например запрос к базе данных, он должен быть отправлен в пул потоков, иначе поток ввода-вывода будет заблокирован, а другие запросы не будут получены. .

  • Если для обработки события используется поток ввода-вывода, и в процессе обработки события инициируется новый запрос ввода-вывода, например запрос входа в систему в событии соединения, будет сообщено об исключении «может вызвать взаимоблокировку», но не будет действительно тупик.

Поэтому в реальных бизнес-сценариях необходимо разделять бизнес-потоки и потоки ввода-вывода. В качестве структуры управления службами dubbo использует Netty как компонент сетевого взаимодействия в нижней части и поддерживает различные стратегии распространения при запросе распространения.

Справочная статья:блог woo woo woo.cn на.com/no_life/art…

3. Стратегия распределения запросов

установление соединения

派发策略

Судя по официальному описанию, duboo поддерживает пять стратегий распространения, посмотрим, как это реализовано. отNtty4.x为пример:

  1. NettyServer
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
     }
    
  2. ChannelHandlers#wrapInternal
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
         // 选择调度策略 默认是all
         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                 .getAdaptiveExtension().dispatch(handler, url))); 
     }
    
    Прошел метод построения NettyServerChannelHandlers#wrapнастройки методаMultiMessageHandler,HeartbeatHandlerИ выберите стратегию планирования через расширение SPI.
  3. NettyServer#doOpen
protected void doOpen() throws Throwable {
      bootstrap = new ServerBootstrap();
      // 多线程模型
      // boss线程池,负责和消费者建立新的连接
      bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
      // worker线程池,负责连接的数据交换
      workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
              new DefaultThreadFactory("NettyServerWorker", true));

      final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
      channels = nettyServerHandler.getChannels();

      bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
              .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {
                      NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                      ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                              .addLast("decoder", adapter.getDecoder()) //设置编解码器
                              .addLast("encoder", adapter.getEncoder())
                              .addLast("handler", nettyServerHandler);
                  }
              });
      // bind 端口
      ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
      channelFuture.syncUninterruptibly();
      channel = channelFuture.channel();

  }

Установите количество пулов потоков-боссов Netty на 1, пул рабочих потоков (то есть потоков ввода-вывода) на количество ядер процессора + 1 и внедрите обработчик в Netty для кодирования, декодирования и обработки сообщений.

Если мы предоставляем только один сервисный порт Dubbo в процессе JVM, то процесс JVM будет иметь только один экземпляр NettyServer и только один экземпляр NettyHandler. И настройте три обработчика для обработки кодека, создания соединения, чтения и записи сообщений и т. д. Внутри даббо определяетChannelHandlerдля использования с НеттиChannelСвязанный, через приведенный выше код найдетNettyServerсама также являетсяChannelHandler. пройти черезNettyServer#doOpenПосле раскрытия сервисного порта клиент может установить соединение с сервером, а провайдер позвонит после инициализации соединения.NettyHandler#channelActiveметод созданияNettyChannel

  1. NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
      logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
      //获取或者创建一个NettyChannel
      NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
      try {
          if (channel != null) {
              // <ip:port, channel>
              channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
          }
          // 这里的 handler就是NettyServer
          handler.connected(channel);
      } finally {
          NettyChannel.removeChannelIfDisconnected(ctx.channel());
      }
  }

Точно так же, как у Нетти и Даббо есть свой собственный ChannelHandler, у Нетти и Даббо также есть свой собственный канал. Этот метод в конечном итоге будет называтьсяNettyServer#connectedметод, чтобы проверить, будет ли новый добавленный канал превышать допустимую конфигурацию, настроенную провайдером. Если он превышает, журнал ошибок будет распечатан напрямую, и канал будет закрыт. Таким образом, потребитель, естественно, получит ненормальную информацию о соединении прерывание.Подробнее см.AbstractServer#connectedметод.

  1. AbstractServer#connected
public void connected(Channel ch) throws RemotingException {
     // If the server has entered the shutdown process, reject any new connection
     if (this.isClosing() || this.isClosed()) {
         logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
         ch.close();
         return;
     }

     Collection<Channel> channels = getChannels();
     //大于accepts的tcp连接直接关闭
     if (accepts > 0 && channels.size() > accepts) { 
         logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
         ch.close();
         return;
     }
     super.connected(ch);
 }
  • В dubbo потребители и провайдеры по умолчанию устанавливают только длинное TCP-соединение (подробный код см. во введении к исходному коду официального веб-сайта, в разделе справки по службам). добавить на стороне потребителя:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
  • Провайдеры могут использовать accepts для управления количеством длинных подключений, чтобы предотвратить слишком много подключений.Конфигурация выглядит следующим образом:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>

запрос на получение

Когда соединение установлено, потребитель может запросить услуги провайдера, а когда запрос придёт, провайдер по очереди пройдёт следующую обработку Handler:

--->NettyServerHandler#channelRead: получить сообщение с запросом.

--->AbstractPeer#received: Если служба была закрыта, вернитесь, в противном случае вызовите следующий обработчик для обработки.

--->MultiMessageHandler#received: Если это пакетный запрос, для обработки запроса будет вызван следующий обработчик.

--->HeartbeatHandler#received: обработка сообщений пульса.

--->AllChannelHandler#received: Обработчик Dubbo очень важен, потому что отсюда происходит изоляция пула потоков ввода-вывода и пула бизнес-потоков.

--->DecodeHandler#received: Расшифровка сообщений.

--->HeaderExchangeHandler#received: обработка сообщений.

--->DubboProtocol: позвонить в сервис.

  1. AllChannelHandler#received:
public void received(Channel channel, Object message) throws RemotingException {
       // 获取业务线程池
       ExecutorService cexecutor = getExecutorService();
       try {
           // 使用线程池处理消息
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
       } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
       }
   }

Исключение перехватывается здесь для выполнения, поскольку пул потоков ввода-вывода не ограничен, но пул бизнес-потоков может быть ограничен, поэтому отправка выполнения может столкнуться с RejectedExecutionException.

Итак, как получить пул бизнес-потоков? ФактическиWrappedChannelHandlerдаxxxChannelHandlerdКласс украшения, который можно узнать по даббо спи, получаетAllChannelHandlerбудет создан первымWrappedChannelHandler.

  1. WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 获取业务线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

резьбовая модель

  1. FixedThreadPool
public class FixedThreadPool implements ThreadPool {

  @Override
  public Executor getExecutor(URL url) {
      // 线程池名称DubboServerHanler-server:port
      String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
      // 缺省线程数量200
      int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
      // 任务队列类型
      int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

      return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
              queues == 0 ? new SynchronousQueue<Runnable>() :
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
              new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
  }

}

200 потоков используются по умолчанию иSynchronousQueueЭто означает, что если все потоки в пуле потоков работают, новые задачи будут отклонены напрямую.

  1. CachedThreadPool
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心线程数量 缺省为0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大线程数量 缺省为Integer.MAX_VALUE
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // queue 缺省为0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 空闲线程存活时间
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

Кэшированный пул потоков, видно, что если скорость отправки задач больше, чем maxThreads, потоки будут создаваться непрерывно, а ресурсы ЦП и памяти будут исчерпаны в экстремальных условиях. Он не подходит для использования при внезапном появлении большого количества трафика.

  1. LimitedThreadPool
public class  LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 缺省核心线程数量为0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 缺省最大线程数量200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 任务队列缺省0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

Если он не настроен, он ничем не отличается от FixedThreadPool.

  1. EagerThreadPool
public class EagerThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
       // 0
       int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
       // Integer.MAX_VALUE
       int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
       // 0
       int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
       // 60s
       int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

       // init queue and executor
       // 初始任务队列为1
       TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
       EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
               threads,
               alive,
               TimeUnit.MILLISECONDS,
               taskQueue,
               new NamedInternalThreadFactory(name, true),
               new AbortPolicyWithReport(name, url));
       taskQueue.setExecutor(executor);
       return executor;
   }
}

EagerThreadPoolExecutor

public void execute(Runnable command) {
       if (command == null) {
           throw new NullPointerException();
       }
       // do not increment in method beforeExecute!
       //已提交任务数量
       submittedTaskCount.incrementAndGet();
       try {
           super.execute(command);
       } catch (RejectedExecutionException rx) { //大于最大线程数被拒绝任务 重新添加到任务队列
           // retry to offer the task into queue.
           final TaskQueue queue = (TaskQueue) super.getQueue();
           try {
               if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                   submittedTaskCount.decrementAndGet();
                   throw new RejectedExecutionException("Queue capacity is full.", rx);
               }
           } catch (InterruptedException x) {
               submittedTaskCount.decrementAndGet();
               throw new RejectedExecutionException(x);
           }
       } catch (Throwable t) {
           // decrease any way
           submittedTaskCount.decrementAndGet();
           throw t;
       }
   }

TaskQueue

public boolean offer(Runnable runnable) {
       if (executor == null) {
           throw new RejectedExecutionException("The task queue does not have executor!");
       }
       // 获取当前线程池中的线程数量
       int currentPoolThreadSize = executor.getPoolSize();
       // have free worker. put task into queue to let the worker deal with task.
       // 如果已经提交的任务数量小于当前线程池中线程数量(不是很理解这里的操作)
       if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
           return super.offer(runnable);
       }

       // return false to let executor create new worker.
       //当前线程数小于最大线程程数直接创建新worker
       if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
           return false;
       }

       // currentPoolThreadSize >= max
       return super.offer(runnable);
   }

Создать первымWorkerПул потоков. когда количество задач превышаетcorePoolSizeно меньше чемmaximumPoolSize, приоритетом является созданиеWorkerсправиться с задачей. Когда количество задач превышаетmaximumPoolSizeКогда задача ставится в очередь на блокировку. Брошен, когда очередь блокировки заполненаRejectedExecutionException. (по сравнению с cached: в cached больше задач, чемmaximumPoolSizeнапрямую генерировать исключение вместо того, чтобы ставить задачу в очередь блокировки).

Согласно приведенному выше анализу кода, если запрос потребителя слишком быстрый, это может привести к тому, что пул бизнес-потоков поставщика услуг выдастRejectedExecutionExceptionаномальный. Это исключение является политикой отклонения треда, принятой duboo.AbortPolicyWithReport#rejectedExecutionОн выбрасывается и будет возвращен потребителю.Простым решением в настоящее время является увеличение количества пула потоков, вызывающего службу поставщика, например, в следующей конфигурации:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>

Чтобы убедиться, что основная служба в модуле имеет доступные потоки (чтобы дополнительная служба не вытесняла слишком много потоков, вызывающих службу), вы можете ограничить параллелизм вторичной службы, например:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>

Стратегия диспетчера dubbo используется по умолчанию.На самом деле, лучший способ справиться с этим — отделить поток ввода-вывода от бизнес-потока, поэтому лучше использовать сообщение. И если вы используете все, если используемая версия dubo относительно низкая, это, вероятно, вызовет ошибку dubbo. Как только пул бизнес-потоков заполнится, будет выдано исключение отклонения выполнения, и перехваченный метод будет введен для обработки, и этот метод по-прежнему использует пул бизнес-потоков, поэтому весьма вероятно, что пул бизнес-потоков все еще полон в момент на этот раз, что приводит к нисходящему HeaderExchangeHandler. Нет возможности вызвать, и ответное сообщение после обработки исключения точноHeaderExchangeHandler#caughtзакончить, так наконецNettyHandler#writeRequestedЕсли он не вызывается, потребитель может только дождаться тайм-аута и не может получить исключение о том, что пул потоков провайдера заполнен (2.6.x исправил эту проблему).

  • Рекомендуемая конфигурация
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />

Справочная статья:медленный указатель. ITeye.com/blog/239117…