Это 4-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления
Прежде всего, поделитесь всеми предыдущими статьями, ставьте лайки, добавляйте в избранное и пересылайте три раза подряд. >>>>😜😜😜
Сборник статей:🎁nuggets.capable/post/694164…
Github :👉github.com/black-ant
Резервное копирование CASE:👉git ee.com/ant black/wipe…
Введение
цель документа
- Упорядочить детали переадресации запросов в Gateway production
- Разберитесь с пунктами настройки переадресации
дополнение к знаниям
Переадресация запросов является одной из основных функций шлюза и включает в себя три основных понятия:
Маршрут:Маршрут — это базовая единица шлюза, состоящая из идентификатора, URI, набора предикатов и набора фильтров.Если предикат соответствует True, он будет перенаправлен.
Предикат (предикат, утверждение):Условием оценки для переадресации маршрутизации является утверждение функции Java 8. Тип ввода — Spring Framework ServerWebExchange. В настоящее время SpringCloud Gateway поддерживает различные методы, такие как: путь, запрос, метод, заголовок и т. д. Метод записи должен соответствовать форма ключ = значение
Фильтр:Фильтр — это логика фильтрации, которая передается при маршрутизации и пересылке запросов с использованием экземпляра GatewayFilter, созданного определенной фабрикой, который можно использовать для изменения содержимого запросов и ответов.
2. Простота использования
2.1 Краткое изложение предикатов
//
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
//
- Cookie=chocolate, ch.p
2.2 Моно и поток
Mono и Flux являются основными объектами, которые проходят через весь процесс.Согласно спецификации реактивных потоков,Издатель предоставляет потенциально неограниченное количество упорядоченных элементов и публикует эти элементы по запросу, полученному от его подписчиков.. Reactor-core имеет набор реализаций этого интерфейса Publisher. Две важные реализации последовательностей, которые мы будем создавать, — это Mono и Flux.
- Flux представляет собой асинхронную последовательность от 0 до N элементов.
- Моно представляет собой асинхронную последовательность из 0 или 1 элементов.
> SpringGateway использует webflux в качестве базового фреймворка для вызовов, который включает объекты mono и Flux.
> Есть 3 типа уведомлений, которые могут быть включены в последовательность:
- обычное сообщение, содержащее элементы
- сообщение об окончании последовательности
- Сообщение об ошибке последовательности
Flux
- Flux — это стандартный издатель, представляющий асинхронную последовательность от 0 до N испускаемых элементов,Опционально завершается сигналом готовности или ошибки. Как и в спецификации Reactive Streams, эти три типа сигналов преобразуются в вызовы методов onNext, onComplete или onError нижестоящих подписчиков.
Mono
- Mono — это еще одна реализация Publisher. Он выдает не более одной записи, а затем (необязательно) завершается сигналом onComplete или сигналом onError,Моно также асинхронно по своей природе
- Он предоставляет только подмножество операторов, доступных для Flux, и некоторые операторы (особенно те, которые объединяют Mono с другим издателем) переключаются на Flux.
- Например, Mono#concatWith(Publisher) возвращает Flux, а Mono#then(Mono) возвращает другой Mono.
Общие методы следующие:
- create: программно создать Flux с возможностью многократного испускания,
- empty: испускать 0 элементов или возвращать пустой Flux
- just: создать базу
- error: Создает поток, который завершается с указанной ошибкой сразу после подписки.
PS: я не буду читать эту часть подробно, сначала прочитайте основной процесс Gateway
3. Глубокий перехват
3.1 Схема
Первый взгляд на принципиальную схему SpringGateway
4. Ввод вызова
4.1 Процесс вызова
- Шаг 1: HttpWebHandlerAdapter # handle: сборка ServerWebExchange и запуск обработки обработчика
- Шаг 2: DispatcherHandler # handle: инициировать обработку запроса
- Шаг 3: RoutePredicateHandlerMapping # getHandlerInternal: обработка оценки маршрута
4.2 Логика getHandlerInternal
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
пройдет по всем маршрутам
5. Процесс отправки
5.1 Система фильтрации веб-обработчиков
Здесь webHandler — это объект FilteringWebHandler, давайте посмотрим на функцию этого объекта.
Здесь задействованы следующие фильтры:
- C- ForwardPathFilter :
- C- ForwardRoutingFilter : Используется для локальной переадресации
- C- GatewayMetricsFilter : Интеграция с Прометеем, таким образом создав панель инструментов Grafana
- C- LoadBalancerClientFilter : Используется для интеграции ленты, сначала получите имя микросервиса, а затем фактический адрес вызова через ленту.
- C- NettyRoutingFilter : http или https , используя Netty
HttpClient
Отправлять прокси-запросы нижестоящим службам - C- NettyWriteResponseFilter : Клиентская сторона для записи ответов прокси обратно на шлюз, поэтому фильтр не будет выполняться, пока все остальные фильтры не закончат выполнение.
- C- OrderedGatewayFilter :
- C- RouteToRequestUrlFilter : получено из запросаИсходный URL-адрес преобразуется в URL-адрес, используемый шлюзом для переадресации запросов.
- C- WebClientHttpRoutingFilter :
- C- WebClientWriteResponseFilter :
- C- WebsocketRoutingFilter : вс или всс, тогда фильтр будет использовать Spring Web Socket для пересылки запроса Websocket нижестоящему серверу.
- C- WeightCalculatorWebFilter :
Вы можете обратиться к ->Встроенный глобальный фильтр Spring Cloud Gateway
Логика вызова 1: управление FilteringWebHandler
В этом объекте есть внутренний класс DefaultGatewayFilterChain, который представляет собой цепочку фильтров Filter.
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// 当前 Filter 链索引
private final int index;
// Filter 集合
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
// 逐个 Filter 过滤调用
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
Поток звонков 3: фильтрация
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 通常判断部分条件 , 如果该 Filter 不符合 , 则跳过该 Filter
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
5.2 Тело отправлено
Основным отправляющим фильтром является NettyRoutingFilter, нижеследующее фокусируется только на соответствующей логике этого фильтра:
C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 请求 URL : http://httpbin.org:80/get
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// 协议类型 : http
String scheme = requestUrl.getScheme();
// Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
// Step 2 : 标识 Routed 已处理
setAlreadyRouted(exchange);
// Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象
ServerHttpRequest request = exchange.getRequest();
// Step 4 : 获取 Method 类型 (get/post...)
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toString();
// Step 5 : 对 Header 进行处理 , 需要转发过去
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
// -> Transfer-Encoding
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
// -> preserveHostHeader
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的
Flux<HttpClientResponse> responseFlux = this.httpClient
.chunkedTransfer(chunkedTransfer).request(method).uri(url)
.send((req, nettyOutbound) -> {
// Step 6 : 转发 Header
req.headers(httpHeaders);
// => 是否需要记录之前的 host
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
req.header(HttpHeaders.HOST, host);
}
// Step 7 : 真正发起请求
return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).responseConnection((res, connection) -> {
// Step 8 : 请求完成 , 获取 response
ServerHttpResponse response = exchange.getResponse();
// Step 9 : 转发headers 和 status 等属性
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
// => String CONTENT_TYPE = "Content-Type"
// => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
// 转发状态 , 存在往 GatewayResponse 设置状态
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
}
else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
}
else {
throw new IllegalStateException(
"Unable to set status code on response: "
+ res.status().code() + ", "
+ response.getClass());
}
// 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
// String TRANSFER_ENCODING = "Transfer-Encoding"
// String CONTENT_LENGTH = "Content-Length"
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// content-length 存在需要去掉 Transfer-Encoding
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
// 延迟提交响应,直到所有路由过滤器都运行
// 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
return Mono.just(res);
});
if (properties.getResponseTimeout() != null) {
// 超时异常处理
responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: "
+ properties.getResponseTimeout())))
.onErrorMap(TimeoutException.class,
// GATEWAY_TIMEOUT(504, "Gateway Timeout")
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
5.3 Возврат ответа
C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
// Step 1 : 获取 GatewayRequest
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
// 连接不存在直接返回空
if (connection == null) {
return Mono.empty();
}
// Step 2 : 获取 GatewayResponse
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
// 此处主要包含一个 byteBufflux
final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
.map(factory::wrap);
// 媒体类型
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
log.trace("invalid media type", e);
}
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
}));
}
Суммировать
Так как нижний слой netty не очень ясен, нет возможности выводить данные для некоторых вызывающих процессов.Эта статья не очень восходящая.Я добавлю подробности позже в будущем.