Spring GateWay: детали пересылки шлюза

Java задняя часть
Spring GateWay: детали пересылки шлюза

Это 4-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления

Прежде всего, поделитесь всеми предыдущими статьями, ставьте лайки, добавляйте в избранное и пересылайте три раза подряд. >>>>😜😜😜
Сборник статей:🎁nuggets.capable/post/694164…
Github :👉github.com/black-ant
Резервное копирование CASE:👉git ee.com/ant black/wipe…

Введение

цель документа

  • Упорядочить детали переадресации запросов в Gateway production
  • Разберитесь с пунктами настройки переадресации

дополнение к знаниям

Переадресация запросов является одной из основных функций шлюза и включает в себя три основных понятия:

Маршрут:Маршрут — это базовая единица шлюза, состоящая из идентификатора, URI, набора предикатов и набора фильтров.Если предикат соответствует True, он будет перенаправлен.
Предикат (предикат, утверждение):Условием оценки для переадресации маршрутизации является утверждение функции Java 8. Тип ввода — Spring Framework ServerWebExchange. В настоящее время SpringCloud Gateway поддерживает различные методы, такие как: путь, запрос, метод, заголовок и т. д. Метод записи должен соответствовать форма ключ = значение
Фильтр:Фильтр — это логика фильтрации, которая передается при маршрутизации и пересылке запросов с использованием экземпляра GatewayFilter, созданного определенной фабрикой, который можно использовать для изменения содержимого запросов и ответов.

2. Простота использования

2.1 Краткое изложение предикатов

// 
- After=2017-01-20T17:42:47.789-07:00[America/Denver]

// 
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]

//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]

// 
- Cookie=chocolate, ch.p

2.2 Моно и поток

Mono и Flux являются основными объектами, которые проходят через весь процесс.Согласно спецификации реактивных потоков,Издатель предоставляет потенциально неограниченное количество упорядоченных элементов и публикует эти элементы по запросу, полученному от его подписчиков.. Reactor-core имеет набор реализаций этого интерфейса Publisher. Две важные реализации последовательностей, которые мы будем создавать, — это Mono и Flux.

  • Flux представляет собой асинхронную последовательность от 0 до N элементов.
  • Моно представляет собой асинхронную последовательность из 0 или 1 элементов.

> SpringGateway использует webflux в качестве базового фреймворка для вызовов, который включает объекты mono и Flux.

> Есть 3 типа уведомлений, которые могут быть включены в последовательность:

  • обычное сообщение, содержащее элементы
  • сообщение об окончании последовательности
  • Сообщение об ошибке последовательности

Flux

  • Flux — это стандартный издатель, представляющий асинхронную последовательность от 0 до N испускаемых элементов,Опционально завершается сигналом готовности или ошибки. Как и в спецификации Reactive Streams, эти три типа сигналов преобразуются в вызовы методов onNext, onComplete или onError нижестоящих подписчиков.

image.png

Mono

  • Mono — это еще одна реализация Publisher. Он выдает не более одной записи, а затем (необязательно) завершается сигналом onComplete или сигналом onError,Моно также асинхронно по своей природе
  • Он предоставляет только подмножество операторов, доступных для Flux, и некоторые операторы (особенно те, которые объединяют Mono с другим издателем) переключаются на Flux.
    • Например, Mono#concatWith(Publisher) возвращает Flux, а Mono#then(Mono) возвращает другой Mono.

image.png

Общие методы следующие:

  • create: программно создать Flux с возможностью многократного испускания,
  • empty: испускать 0 элементов или возвращать пустой Flux
  • just: создать базу
  • error: Создает поток, который завершается с указанной ошибкой сразу после подписки.

PS: я не буду читать эту часть подробно, сначала прочитайте основной процесс Gateway

3. Глубокий перехват

3.1 Схема

Первый взгляд на принципиальную схему SpringGateway

GateWayAll.jpg

4. Ввод вызова

4.1 Процесс вызова

  • Шаг 1: HttpWebHandlerAdapter # handle: сборка ServerWebExchange и запуск обработки обработчика
  • Шаг 2: DispatcherHandler # handle: инициировать обработку запроса
  • Шаг 3: RoutePredicateHandlerMapping # getHandlerInternal: обработка оценки маршрута

4.2 Логика getHandlerInternal

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
   // don't handle requests on management port if set and different than server port
   if (this.managementPortType == DIFFERENT && this.managementPort != null
         && exchange.getRequest().getURI().getPort() == this.managementPort) {
      return Mono.empty();
   }
   exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

   return lookupRoute(exchange)
         // .log("route-predicate-handler-mapping", Level.FINER) //name this
         .flatMap((Function<Route, Mono<?>>) r -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isDebugEnabled()) {
               logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
            }

            exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
            return Mono.just(webHandler);
         }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isTraceEnabled()) {
               logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
            }
         })));
}

3.2. lookupRoute

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
   return this.routeLocator.getRoutes()
         // individually filter routes so that filterWhen error delaying is not a
         // problem
         .concatMap(route -> Mono.just(route).filterWhen(r -> {
            // add the current route we are testing
            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            return r.getPredicate().apply(exchange);
         })
               // instead of immediately stopping main flux due to error, log and
               // swallow it
               .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
               .onErrorResume(e -> Mono.empty()))
         // .defaultIfEmpty() put a static Route not found
         // or .switchIfEmpty()
         // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
         .next()
         // TODO: error handling
         .map(route -> {
            validateRoute(route, exchange);
            return route;
         });


}

пройдет по всем маршрутам route_001.png

5. Процесс отправки

5.1 Система фильтрации веб-обработчиков

Здесь webHandler — это объект FilteringWebHandler, давайте посмотрим на функцию этого объекта.

image.png

Здесь задействованы следующие фильтры:

  • C- ForwardPathFilter :
  • C- ForwardRoutingFilter : Используется для локальной переадресации
  • C- GatewayMetricsFilter : Интеграция с Прометеем, таким образом создав панель инструментов Grafana
  • C- LoadBalancerClientFilter : Используется для интеграции ленты, сначала получите имя микросервиса, а затем фактический адрес вызова через ленту.
  • C- NettyRoutingFilter : http или https , используя NettyHttpClientОтправлять прокси-запросы нижестоящим службам
  • C- NettyWriteResponseFilter : Клиентская сторона для записи ответов прокси обратно на шлюз, поэтому фильтр не будет выполняться, пока все остальные фильтры не закончат выполнение.
  • C- OrderedGatewayFilter :
  • C- RouteToRequestUrlFilter : получено из запросаИсходный URL-адрес преобразуется в URL-адрес, используемый шлюзом для переадресации запросов.
  • C- WebClientHttpRoutingFilter :
  • C- WebClientWriteResponseFilter :
  • C- WebsocketRoutingFilter : вс или всс, тогда фильтр будет использовать Spring Web Socket для пересылки запроса Websocket нижестоящему серверу.
  • C- WeightCalculatorWebFilter :

Вы можете обратиться к ->Встроенный глобальный фильтр Spring Cloud Gateway

Логика вызова 1: управление FilteringWebHandler

В этом объекте есть внутренний класс DefaultGatewayFilterChain, который представляет собой цепочку фильтров Filter.

private static class DefaultGatewayFilterChain implements GatewayFilterChain {
    
   // 当前 Filter 链索引 
   private final int index;
   // Filter 集合 
   private final List<GatewayFilter> filters;

   DefaultGatewayFilterChain(List<GatewayFilter> filters) {
      this.filters = filters;
      this.index = 0;
   }

   private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
      this.filters = parent.getFilters();
      this.index = index;
   }

   public List<GatewayFilter> getFilters() {
      return filters;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange) {
      return Mono.defer(() -> {
         if (this.index < filters.size()) {
            // 逐个 Filter 过滤调用 
            GatewayFilter filter = filters.get(this.index);
            DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
                  this.index + 1);
            return filter.filter(exchange, chain);
         }
         else {
            return Mono.empty(); // complete
         }
      });
   }

}

Поток звонков 3: фильтрация

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    // 通常判断部分条件  , 如果该 Filter 不符合 , 则跳过该 Filter
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }

5.2 Тело отправлено

Основным отправляющим фильтром является NettyRoutingFilter, нижеследующее фокусируется только на соответствующей логике этого фильтра:

C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   // 请求 URL : http://httpbin.org:80/get
   URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // 协议类型 : http
   String scheme = requestUrl.getScheme();
   
   // Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
   // Step 2 : 标识 Routed 已处理
   setAlreadyRouted(exchange);

   // Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象
   ServerHttpRequest request = exchange.getRequest();
    
   // Step 4 : 获取 Method 类型 (get/post...)
   final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
   final String url = requestUrl.toString();
   
   // Step 5 : 对 Header 进行处理 , 需要转发过去
   HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
   final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
   filtered.forEach(httpHeaders::set);
    
   // -> Transfer-Encoding
   String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
   boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
   // -> preserveHostHeader
   boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

   // 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的
   Flux<HttpClientResponse> responseFlux = this.httpClient
         .chunkedTransfer(chunkedTransfer).request(method).uri(url)
         .send((req, nettyOutbound) -> {
            // Step 6 : 转发 Header 
            req.headers(httpHeaders);
            
            // => 是否需要记录之前的 host
            if (preserveHost) {
               String host = request.getHeaders().getFirst(HttpHeaders.HOST);
               req.header(HttpHeaders.HOST, host);
            }
            
            // Step 7 : 真正发起请求
            return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
                  .send(request.getBody()
                        .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                              .getNativeBuffer()));
         }).responseConnection((res, connection) -> {
            // Step 8 : 请求完成 , 获取 response
            ServerHttpResponse response = exchange.getResponse();
            
            // Step 9 : 转发headers 和 status 等属性
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(
                  entry -> headers.add(entry.getKey(), entry.getValue()));
            
            // => String CONTENT_TYPE = "Content-Type" 
            // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
               exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                     contentTypeValue);
            }
            
            // 转发状态 , 存在往 GatewayResponse 设置状态
            HttpStatus status = HttpStatus.resolve(res.status().code());
            if (status != null) {
               response.setStatusCode(status);
            }
            else if (response instanceof AbstractServerHttpResponse) {
               ((AbstractServerHttpResponse) response)
                     .setStatusCodeValue(res.status().code());
            }
            else {
               throw new IllegalStateException(
                     "Unable to set status code on response: "
                           + res.status().code() + ", "
                           + response.getClass());
            }

            // 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                  getHeadersFilters(), headers, exchange, Type.RESPONSE);
            
            //  String TRANSFER_ENCODING = "Transfer-Encoding"
            //  String CONTENT_LENGTH = "Content-Length"
            if (!filteredResponseHeaders
                  .containsKey(HttpHeaders.TRANSFER_ENCODING)
                  && filteredResponseHeaders
                        .containsKey(HttpHeaders.CONTENT_LENGTH)) {
               // content-length 存在需要去掉 Transfer-Encoding
               response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                  filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            // 延迟提交响应,直到所有路由过滤器都运行
            // 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            return Mono.just(res);
         });

   if (properties.getResponseTimeout() != null) {
      // 超时异常处理
      responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
            Mono.error(new TimeoutException("Response took longer than timeout: "
                  + properties.getResponseTimeout())))
            .onErrorMap(TimeoutException.class,
                  // GATEWAY_TIMEOUT(504, "Gateway Timeout")
                  th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
                        th.getMessage(), th));
   }

   return responseFlux.then(chain.filter(exchange));
}

5.3 Возврат ответа

C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   return chain.filter(exchange).then(Mono.defer(() -> {
      // Step 1 : 获取 GatewayRequest
      Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
      // 连接不存在直接返回空 
      if (connection == null) {
         return Mono.empty();
      }
      
      // Step 2 : 获取 GatewayResponse
      ServerHttpResponse response = exchange.getResponse();
      NettyDataBufferFactory factory = (NettyDataBufferFactory) response
            .bufferFactory();

      // 此处主要包含一个 byteBufflux
      final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
            .map(factory::wrap);

      // 媒体类型  
      MediaType contentType = null;
      try {
         contentType = response.getHeaders().getContentType();
      }
      catch (Exception e) {
         log.trace("invalid media type", e);
      }
      return (isStreamingMediaType(contentType)
            ? response.writeAndFlushWith(body.map(Flux::just))
            : response.writeWith(body));
   }));
}

Суммировать

Так как нижний слой netty не очень ясен, нет возможности выводить данные для некоторых вызывающих процессов.Эта статья не очень восходящая.Я добавлю подробности позже в будущем.