Разбор исходного кода Spring WebFlux

Spring Boot

1 Обзор

Я до сих пор помню, когда в прошлом году я смотрел на исходный код hystrix, меня смутил rxjava, и я все еще был сбит с толку, когда увидел слово «отзывчивый». Некоторое время назад я изучил Vert.x, а затем осознал важность реактивного программирования, когда основные фреймворки до крайности использовали CompletableFuture java8. В конце концов, для языка, который изначально не поддерживает сопрограммы, реактивность имеет больше смысла. Но есть и плюсы и минусы.Для сцены доброжелатель видит разницу. Сегодня мы представим отзывчивый веб-фреймворк WebFlux, запущенный Spring5.

2. Отзывчивый, асинхронный, синхронный

  • Синхронный вызов: Использование традиционного программирования с блокировкой приведет к трате слишком большого количества ресурсов потока при высоком уровне параллелизма, поскольку каждый запрос занимает поток. Если io заблокирован, поток будет заблокирован, и ЦП не будет использоваться полностью. А высокий уровень параллелизма приведет к созданию слишком большого количества ресурсов потоков.
  • Асинхронный неблокирующий: Как правило, результат выполнения обрабатывается через callback, но дело сложное, вы столкнетесь с callback hell, и вернуться в код сложно. При использовании режима Future в Java операция get по-прежнему заблокирована, а не асинхронна.
  • Отзывчивый: исходя из удобочитаемости кода, он поддерживает асинхронность задач, что может повысить пропускную способность системы. Для некоторых отличных адаптивных фреймворков, таких как Reactor, также поддерживается обратное давление. Но отзывчивость не уменьшает время обработки запроса. Это только улучшит загрузку ЦП и позволит избежать ненужного переключения потоков. Отзывчивый похож на форму нажатия наблюдателя. Когда задача будет завершена, наблюдатель будет активно передавать ее потребителю для обработки.

обратное давление: поскольку он основан на режиме push, когда скорость производства реактивных потоковых данных слишком высока, а обработка потребителя слишком медленна, происходит накопление сообщений. Чтобы избежать этой проблемы, необходимо контролировать скорость производства сообщений.

Чтобы позаимствовать предложение из весенней документации:

If a publisher cannot slow down, it has to decide whether to buffer, drop, or fail.

3. Отслеживание источника

Создать сервер

Веб-модуль springboot обеспечивает поддержку веб-сервисов. WebFlux по умолчанию использует netty в качестве веб-сервера, поэтому мы в основном рассматриваем реализацию этого аспекта.

Короче говоря, Springboot запускает службу с помощью метода run. В этом методе вызывается метод createApplicationContext для создания ApplicationContext Spring. Шаблон метода шаблона Spring может предоставлять множество реализаций контекста. Выбирайте разные реализации в зависимости от среды.

protected ConfigurableApplicationContext createApplicationContext() {
   Class<?> contextClass = this.applicationContextClass;
   if (contextClass == null) {
      try {
         switch (this.webApplicationType) {
         case SERVLET:
            contextClass = Class.forName(DEFAULT_WEB_CONTEXT_CLASS);
            break;
         case REACTIVE:
            contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
            break;
         default:
            contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
         }
      }
      catch (ClassNotFoundException ex) {
         throw new IllegalStateException(
               "Unable create a default ApplicationContext, "
                     + "please specify an ApplicationContextClass",
               ex);
      }
   }
   return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
}

webFlux принимает режим REACTIVE, то есть инициализированным контекстом является AnnotationConfigReactiveWebServerApplicationContext. После завершения создания сначала вызывается подготовка. Затем вызовите refreshContext, чтобы обновить контекст. Этот процесс представляет собой серию операций, таких как инициализация и отслеживание регистрации.

Основное внимание уделяется методу обновления onRefresh.

onRefresh метод

@Override
protected void onRefresh() {
   super.onRefresh();
   try {
      createWebServer();
   }
   catch (Throwable ex) {
      throw new ApplicationContextException("Unable to start reactive web server",
            ex);
   }
}
private void createWebServer() {
   WebServer localServer = this.webServer;
   if (localServer == null) {
      this.webServer = getWebServerFactory().getWebServer(getHttpHandler());
   }
   initPropertySources();
}

Это вызывает функцию createWebServer для создания веб-сервера. Если используется netty, будет вызван метод getWebServer NettyReactiveWebServerFactory. Создал сервер через билдер.

В конечном итоге будет создан TcpBridgeServer, а TcpBridgeServer соединится с TcpServer. Переопределите метод doHander. Роль этого метода заключается в предоставлении ChannelInitializer для netty. Потому что для разных протоколов передачи прикладного уровня логика обработки определенно несовместима. То же самое верно, если вы переключитесь на веб-сокет. Следовательно, мы можем ссылаться на этот метод для протокола прикладного уровня, основанного на TCP.

@Override
protected ContextHandler<Channel> doHandler(
      BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
      MonoSink<NettyContext> sink) {
   BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate =
         compressPredicate(options);
   boolean alwaysCompress = compressPredicate == null && options.minCompressionResponseSize() == 0;
   return ContextHandler.newServerContext(sink,
         options,
         loggingHandler,
         (ch, c, msg) -> {
            HttpServerOperations ops = HttpServerOperations.bindHttp(ch,
                  handler,
                  c,
                  compressPredicate,
                  msg);

            if (alwaysCompress) {
               ops.compression(true);
            }
            return ops;
         })
                        .onPipeline(this)
                        .autoCreateOperations(false);
}

хорошо, вот собственно фабрика, когда придет http запрос, будет построен HttpServerOperations. Подробности будут представлены позже. ps: функциональное программирование такое отвратительное, но оно позволяет указать метод в качестве параметра в соответствующем месте.

ContextHandler

Выше сказано, что это ChannelInitializer, поэтому мы отслеживаем его метод initChannel.

@Override
protected void initChannel(CHANNEL ch) throws Exception {
   accept(ch);
}
accpet:
try {
   if (pipelineConfigurator != null) {
      pipelineConfigurator.accept(channel.pipeline(),
            (ContextHandler<Channel>) this);
   }
   channel.pipeline()
          .addLast(NettyPipeline.ReactiveBridge,
                new ChannelOperationsHandler(this));
}

Он вызовет соответствующий метод accept. Основная логика — вышеприведенные строки, ядро ​​— то, что он вызовет метод pipeConfigurator.accept. pipeConfigurator на самом деле является TcpBridgeServer. Он передается в методе onPipeline doHandler.

Метод accept TcpBridgeServer фактически предназначен для регистрации кодека http и обработчика запросов http.

@Override
public void accept(ChannelPipeline p, ContextHandler<Channel> c) {
   p.addLast(NettyPipeline.HttpCodec, new HttpServerCodec(
           options.httpCodecMaxInitialLineLength(),
           options.httpCodecMaxHeaderSize(),
           options.httpCodecMaxChunkSize(),
           options.httpCodecValidateHeaders(),
           options.httpCodecInitialBufferSize()));

   if (ACCESS_LOG_ENABLED) {
      p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler());
   }

   p.addLast(NettyPipeline.HttpServerHandler, new HttpServerHandler(c));
}

HttpServerHandler

Увидя здесь, мы будем знать. Все HTTP-запросы будут направляться методу channelRead HttpServerHandler.

if (persistentConnection) {
   pendingResponses += 1;
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Increasing pending responses, now {}"),
            pendingResponses);
   }
   persistentConnection = isKeepAlive(request);
}
else {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Dropping pipelined HTTP request, " +
                  "previous response requested connection close"));
   }
   ReferenceCountUtil.release(msg);
   return;
}
if (pendingResponses > 1) {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Buffering pipelined HTTP request, " +
                  "pending response count: {}, queue: {}"),
            pendingResponses,
            pipelined != null ? pipelined.size() : 0);
   }
   overflow = true;
   doPipeline(ctx, msg);
   return;
}
else {
   overflow = false;
   parentContext.createOperations(ctx.channel(), msg);
   if (!(msg instanceof FullHttpRequest)) {
      return;
   }
}

Этот метод относительно длинный, а логика для обработки httprequest перехвачена выше:

1 определяет, успешно ли разрешено сообщение, и напрямую возвращает ответ об ошибке в случае сбоя.

2. Определите, поддерживается ли текущее соединение. Если соединение разрывается, сообщение просто отбрасывается. В противном случае pendingResponses увеличивается на 1, а pendingResponses — это количество ожидающих ответов.

3. Судья ожидает ответов

  • Больше 1, что указывает на то, что текущее соединение имеет ожидающий ответ. Добавьте запрос в очередь (метод doPipeline), дождитесь последующей обработки и установите для переполнения значение true.
  • Равно 1, обработать запрос. Таким образом, соединение не может обрабатывать два запроса одновременно и должно ждать обработки предыдущего ответа.

Почему есть ожидающие ответы? На самом деле это предел поддержки активности http1.1. Потому что у меня сложилось впечатление, что tomcat тоже так делает. Должна быть последовательная сессия. http2.0 поддерживает параллельные сеансы

метод doPipeline

void doPipeline(ChannelHandlerContext ctx, Object msg) {
   if (pipelined == null) {
      pipelined = Queues.unbounded()
                        .get();
   }
   if (!pipelined.offer(msg)) {
      ctx.fireExceptionCaught(Exceptions.failWithOverflow());
   }
}

Этот метод состоит в том, чтобы присоединиться к очереди, то есть верхней стороне, для соединения, если последний запрос не ответил. Здесь следующий запрос будет добавлен в очередь.

@Override
public void run() {
   Object next;
   boolean nextRequest = false;
   while ((next = pipelined.peek()) != null) {
      if (next instanceof HttpRequest) {
         if (nextRequest || !persistentConnection) {
            return;
         }
         nextRequest = true;
         parentContext.createOperations(ctx.channel(), next);
         if (!(next instanceof FullHttpRequest)) {
            pipelined.poll();
            continue;
         }
      }
      ctx.fireChannelRead(pipelined.poll());
   }
   overflow = false;
}

Этот метод запуска фактически обрабатывает запрос на присоединение к очереди выше. Управляемый nextRequest, одновременно будет обрабатываться только один запрос. Всем должно быть любопытно, где выполняется этот метод запуска, на самом деле я давно его искал. Наконец нашел его в методе записи HttpServerHandler.

if (pipelined != null && !pipelined.isEmpty()) {
   if (HttpServerOperations.log.isDebugEnabled()) {
      HttpServerOperations.log.debug(format(ctx.channel(), "Draining next pipelined " +
                  "request, pending response count: {}, queued: {}"),
            pendingResponses, pipelined.size());
   }
   ctx.executor()
      .execute(this);
}
else {
   ctx.read();
}

Почему вы хотите выполнить его в записи. Потому что запись означает, что последний запрос для этого соединения уже ответил. Мы можем перейти к следующему запросу для этого соединения.

Хорошо, из приведенного выше анализа мы можем понять, как netty принимает и обрабатывает http-запросы. И как поддерживать последовательную сессию поддержки активности http1.1. Тогда давайте рассмотрим конкретную логику обработки http-запросов.

обработать запрос

Из приведенного выше анализа мы можем в основном знать, что после получения запроса вызывается следующий метод.

parentContext.createOperations(ctx.channel(), msg);

parentContext на самом деле является ContextHandler, созданным в TcpBridgeServer.

Основная логика метода createOperations ContextHandler — это метод channel.eventLoop().execute(op::onHandlerStart).

protected void onHandlerStart() {
   applyHandler();
}

Здесь отметим: он использует для выполнения текущий eventLoop. Эта операция на самом деле является HttpServerOperations, которая в конечном итоге вызовет метод applyHandler базового класса.

метод applyHandler

    protected final void applyHandler() {
//    channel.pipeline()
//           .fireUserEventTriggered(NettyPipeline.handlerStartedEvent());
      if (log.isDebugEnabled()) {
         log.debug(format(channel(), "[{}] Handler is being applied: {}"), formatName(), handler);
      }
      try {
         Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
             .subscribe(this);
      }
      catch (Throwable t) {
         log.error(format(channel(), ""), t);
         channel.close();
      }
   }

Основная логика здесь заключается в вызове метода handler.apply. Соответствующий класс этого обработчика. Он передается при создании веб-сервера.

ReactorHttpHandlerAdapter#apply

Этот метод создает ReactorServerHttpRequest и ReactorServerHttpResponse. Затем вызовите метод handle метода httpHandler для обработки логики. На самом деле это упаковка исходного запроса и ответа. Этот обработчик предоставляется HttpWebHandlerAdapter, и, наконец, вызывается метод обработчика DispatcherHandler.

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
   if (logger.isDebugEnabled()) {
      ServerHttpRequest request = exchange.getRequest();
      logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
   }
   if (this.handlerMappings == null) {
      return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
   }
   return Flux.fromIterable(this.handlerMappings)
         .concatMap(mapping -> mapping.getHandler(exchange))
         .next()
         .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
         .flatMap(handler -> invokeHandler(exchange, handler))
         .flatMap(result -> handleResult(exchange, result));
}

1. Метод mapping.getHandler будет искать соответствующий метод HandlerMethod в соответствии с запрошенной информацией.

Позвольте мне рассказать об этом здесь, начальное расположение обработчика, написанного нашим бизнесом: Он инициализируется в методе initHandlerMethods класса AbstractHandlerMethodMapping. Вся информация хранится в MappingRegistry.

2.InvokeHandler будет выполнять бизнес-логику.

3. Метод handleResult обрабатывает возвращенный результат. Для другой логики требуются разные процессоры для обработки возврата (например, логика обработки ResponseEntity и ResponseBody отличается)

RequestMappingHandlerAdapter#handler

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
   HandlerMethod handlerMethod = (HandlerMethod) handler;
   Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized")
   InitBinderBindingContext bindingContext = new InitBinderBindingContext(
         getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod));
   InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
   Function<Throwable, Mono<HandlerResult>> exceptionHandler =
         ex -> handleException(ex, handlerMethod, bindingContext, exchange);
   return this.modelInitializer
         .initModel(handlerMethod, bindingContext, exchange)
         .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
         .doOnNext(result -> result.setExceptionHandler(exceptionHandler))
         .doOnNext(result -> bindingContext.saveModel())
         .onErrorResume(exceptionHandler);
}

Этот метод является методом, который обрабатывает бизнес-логику.

initModel предназначен в основном для инициализации некоторых данных сеанса. Хранится в привязкеContext.

Затем будет вызван метод invocableMethod.invoke для выполнения бизнес-логики.

Затем для результата устанавливается обработчик исключений. Если исключение обработано, будет выполнена логика исключения, в противном случае HandlerResult будет возвращен непосредственно на верхний уровень для обработки.

InvocableHandlerMethod#invoke

public Mono<HandlerResult> invoke(
      ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
   return resolveArguments(exchange, bindingContext, providedArgs).flatMap(args -> {
      try {
         Object value = doInvoke(args);
         HttpStatus status = getResponseStatus();
         if (status != null) {
            exchange.getResponse().setStatusCode(status);
         }
         MethodParameter returnType = getReturnType();
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(returnType.getParameterType());
         boolean asyncVoid = isAsyncVoidReturnType(returnType, adapter);
         if ((value == null || asyncVoid) && isResponseHandled(args, exchange)) {
            logger.debug("Response fully handled in controller method");
            return asyncVoid ? Mono.from(adapter.toPublisher(value)) : Mono.empty();
         }


         HandlerResult result = new HandlerResult(this, value, returnType, bindingContext);
         return Mono.just(result);
      }
      catch (InvocationTargetException ex) {
         return Mono.error(ex.getTargetException());
      }
      catch (Throwable ex) {
         return Mono.error(new IllegalStateException(getInvocationErrorMessage(args)));
      }
   });
}

1.resolveArguments в основном основан на конфигурации метода и параметрах метода десериализации данных запроса.

2.doInvoke вызывает метод через отражение.

3. Постройте возврат результатов

После возврата Spring выберет соответствующий процессор для обработки возвращенного результата через конфигурацию нашего метода. Обычно используется ResponseBody. Его логика обработки заключается в сериализации результата в json. Наконец, напишите ответ клиенту.

Конкретная операция записи реализована в подписке в DeferredWriteMono. Существует много слоев инкапсуляции. Но этот метод будет вызван в конце.

5. Резюме

На самом деле реализация webflux заключается в том, чтобы использовать отзывчивость, чтобы переписать исходную реализацию. Код легче читать и понимать. Но это настоящая боль для отладки. Простым пониманием является Stud Stream, который повторяется и, наконец, передается в NioEventLoop для потребления. Что касается того, как связать вместе. На самом деле все еще зависит от отзывчивости.

Использование webflux может лучше использовать процессор. Уменьшите ненужную блокировку потоков. В ежедневной разработке, если она требует больших вычислительных ресурсов. Вполне возможно запустить прямо в цикле событий. А вот для сетевого типа io необходимо построить пул потоков для асинхронной обработки. Еще лучше, если соответствующий сервис поддерживает реактивную асинхронность.