Лавина Spring Cloud Gateway, я тупой

Java задняя часть Spring Cloud
Лавина Spring Cloud Gateway, я тупой

Мало знаний, большой вызов! Эта статья участвует в "Необходимые знания для программистов«Творческая деятельность.

Эта серияя тупойШестой выпуск серии [закрой лицо], замечательный обзор предыдущих выпусков:

image

Всем привет, опять я тупой. Этот опыт говорит нам о том, что если вы выходите и пишете код для кражи, вам рано или поздно придется отплатить за это.

Явление проблемы и предыстория

Наша шлюзовая лавина на какое-то время прошлой ночью, явление такое:

1. Постоянно разные микросервисы сообщают об исключениях: при записи HTTP-ответа соединение было закрыто:

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response 

2. При этом есть исключение, что запрос не прочитан и соединение закрыто.:

org.springframework.http.converter.HttpMessageNotReadableException: I/O error while reading input message; nested exception is java.io.IOException: UT000128: Remote peer closed connection before all data could be read 

3. Внешний интерфейс постоянно имеет тревогу тайм-аута запроса, 504 Gateway Time-out.

4. Процесс шлюза не проходит проверку работоспособности и перезапускается.

5. После перезапуска процесса шлюза количество запросов сразу резко возросло.Пиковое значение каждого экземпляра было 2000 qps, а каждый экземпляр был 500 qps, когда он простаивал.Когда он был занят, он мог удерживать каждый экземпляр в пределах 1000 qps из-за расширения, а затем интерфейс проверки работоспособности очень долго не отвечал, что приводило к постоянному перезапуску экземпляра.

Среди них проблема 1 и 2 должна заключаться в том, что шлюз должен перезапускаться постоянно, и по некоторым причинам изящное завершение работы не привело к принудительному отключению, а принудительное завершение привело к принудительному отключению соединения, поэтому есть 1 и 2 связанные исключения.

Наш шлюз реализован на основе Spring Cloud Gateway и имеет механизм автоматического масштабирования в зависимости от загрузки процессора. Странно то, что при увеличении количества запросов загрузка ЦП сильно не увеличивалась, и оставалась на уровне около 60%, так как загрузка ЦП не достигала предела расширения, то и автоматического расширения не было. Чтобы решить проблему быстро, мы вручную расширили несколько экземпляров шлюза и контролировали нагрузку одного экземпляра шлюза до уровня менее 1000, что временно решило проблему.

анализ проблемы

Чтобы полностью решить эту проблему, мы используем анализ JFR. Сначала проанализируйте известные подсказки:

  1. Spring Cloud Gateway — это асинхронный отзывчивый шлюз, реализованный на основе Spring-WebFlux.Поток службы http ограничен (по умолчанию 2 * количество процессоров, которые можно использовать, здесь у нас 4).
  2. Процесс шлюза постоянно не проходит проверку работоспособности.Проверка работоспособности вызывает интерфейс /actuator/health, время ожидания которого истекает.

Обычно существует две причины тайм-аута интерфейса проверки работоспособности:

  1. Когда интерфейс проверки работоспособности проверяет компонент, он блокируется. Например, если база данных зависла, проверка работоспособности базы данных может никогда не вернуться.
  2. Пул DiTh HTTP не был обработан и имеет запрос на проверку здоровья, а запрос - это время ожидания.

Сначала мы можем посмотреть на стек синхронизации в JFR, чтобы увидеть, не застряли ли какие-либо потоки http при проверке работоспособности. Проверьте стек потоков после возникновения проблемы, сосредоточьтесь на 4 потоках http и обнаружите, что стеки этих 4 потоков в основном одинаковы, и все они выполняют команды Redis:

"reactor-http-nio-1" #68 daemon prio=5 os_prio=0 cpu=70832.99ms elapsed=199.98s tid=0x0000ffffb2f8a740 nid=0x69 waiting on condition  [0x0000fffe8adfc000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
	- parking to wait for  <0x00000007d50eddf8> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.8/CompletableFuture.java:1798)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.8/ForkJoinPool.java:3128)
	at java.util.concurrent.CompletableFuture.timedGet(java.base@11.0.8/CompletableFuture.java:1868)
	at java.util.concurrent.CompletableFuture.get(java.base@11.0.8/CompletableFuture.java:2021)
	at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
	at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:244)
	at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
	at com.sun.proxy.$Proxy245.get(Unknown Source)
	at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68)
	at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:267)
	at org.springframework.data.redis.connection.DefaultStringRedisConnection.get(DefaultStringRedisConnection.java:406)
	at org.springframework.data.redis.core.DefaultValueOperations$1.inRedis(DefaultValueOperations.java:57)
	at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:60)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
	at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53)
	at com.jojotech.apigateway.filter.AccessCheckFilter.traced(AccessCheckFilter.java:196)
	at com.jojotech.apigateway.filter.AbstractTracedFilter.filter(AbstractTracedFilter.java:39)
	at org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter.filter(FilteringWebHandler.java:137)
	at org.springframework.cloud.gateway.filter.OrderedGatewayFilter.filter(OrderedGatewayFilter.java:44)
	at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain.lambda$filter$0(FilteringWebHandler.java:117)
	at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain$$Lambda$1478/0x0000000800b84c40.get(Unknown Source)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
	at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onNext(MonoFilterWhen.java:149)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
	at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onSubscribe(MonoFilterWhen.java:112)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
	at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:98)
	at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44)
	at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
	at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.request(FluxDematerialize.java:127)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
	at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onSubscribe(FluxDematerialize.java:77)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace.subscribe(TraceWebFilter.java:184)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:915)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654)
	at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:526)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:209)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.logging.AccessLogHandlerH1.channelRead(AccessLogHandlerH1.java:59)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)

Обнаружено, что поток http не застревает в проверке работоспособности, а другие потоки не имеют стека, связанного с проверкой работоспособности (в асинхронной среде проверка работоспособности также является асинхронной, и часть процессов может быть передана в другие темы). Поэтому перед выполнением запрос на проверку работоспособности должен быть отменен по тайм-ауту.

Так почему же это так? В то же время я также обнаружил, что здесь используется RedisTemplate, который является синхронным API Redis для spring-data-redis. Я вдруг вспомнил, когда писал код здесь раньше, потому что он был просто для проверки существования ключа и изменения времени истечения срока действия ключа, поэтому я был ленив и не использовал асинхронный API. Является ли это лавиной, вызванной блокировкой потока http с помощью синхронного API?

Давайте проверим эту гипотезу: операция redis в нашем проекте осуществляется через пул соединений spring-data-redis + Lettuce, включая и добавляя мониторинг JFR в команде Lettuce, вы можете обратиться к этой моей статье:Этот новый метод мониторинга пула соединений Redis не подвергается проверке ~ я добавлю еще немного приправы., на данный момент мой запрос на включение был объединен, эта функция будет выпущена в версии 6.2.x. Давайте взглянем на набор команд Redis во время возникновения проблемы, как показано на следующем рисунке:

image

Давайте просто посчитаем время блокировки, вызванное выполнением команды Redis (наше получение происходит раз в 10 секунд, count — это количество команд, а единица времени — микросекунды): используйте здесь количество команд, чтобы умножить медиану на 50%, и разделите на 10 (поскольку это 10 с), получите время блокировки, вызванное выполнением команд Redis в секунду:

32*152=4864
1*860=860
5*163=815
32*176=5632
1*178=178
16959*168=2849112
774*176=136224
3144*166=521904
17343*179=3104397
702*166=116532
总和 6740518
6740518 / 10 = 674051.8 us = 0.67s

Это только время блокировки, рассчитанное по медиане.Из распределения на графике видно, что реальное значение должно быть больше этого, поэтому весьма вероятно, что время блокировки на интерфейсе синхронизации Redis в секунду превышает 1с , непрерывно Запрос не уменьшался, что приводило к накоплению запросов, а в итоге лавине.

И поскольку это блокирующий интерфейс, поток тратит много времени на ожидание io,Таким образом, процессор не может увеличиваться, что приводит к отсутствию автоматического расширения.. Когда бизнес находится на пике из-за предустановленного расширения, один экземпляр шлюза не достигает давления проблемы, поэтому проблемы нет.

Решать проблему

Давайте перепишем исходный код и используем синхронный API Spring-Data-Redis, Оригинальный код (фактически, основной метод интерфейса Filter Spring-Cloud-Gatewaypublic Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain)тело метода):

if (StringUtils.isBlank(token)) {
	//如果 token 不存在,则根据路径决定继续请求还是返回需要登录的状态码
	return continueOrUnauthorized(path, exchange, chain, headers);
} else {
	try {
		String accessTokenValue = redisTemplate.opsForValue().get(token);
		if (StringUtils.isNotBlank(accessTokenValue)) {
			//如果 accessTokenValue 不为空,则续期 4 小时,保证登录用户只要有操作就不会让 token 过期
			Long expire = redisTemplate.getExpire(token);
			log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
			if (expire != null && expire < 4 * 60 * 60) {
				redisTemplate.expire(token, 4, TimeUnit.HOURS);
			}
			
			//解析,获取 userId
			JSONObject accessToken = JSON.parseObject(accessTokenValue);
			String userId = accessToken.getString("userId");
			//如果 userId 不为空才合法
			if (StringUtils.isNotBlank(userId)) {
				//解析 Token 
				HttpHeaders newHeaders = parse(accessToken);
				//继续请求
				return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
			}
		}
	} catch (Exception e) {
		log.error("read accessToken error: {}", e.getMessage(), e);
	}
	//如果 token 不合法,则根据路径决定继续请求还是返回需要登录的状态码
	return continueOrUnauthorized(path, exchange, chain, headers);
}

Измените использование асинхронного режима:

if (StringUtils.isBlank(token)) {
	return continueOrUnauthorized(path, exchange, chain, headers);
} else {
	HttpHeaders finalHeaders = headers;
	//必须使用 tracedPublisherFactory 包裹,否则链路信息会丢失,这里参考我的另一篇文章:Spring Cloud Gateway 没有链路信息,我 TM 人傻了
	return tracedPublisherFactory.getTracedMono(
			redisTemplate.opsForValue().get(token)
					//必须切换线程,否则后续线程使用的还是 Redisson 的线程,如果耗时长则会影响其他使用 Redis 的业务,并且这个耗时也算在 Redis 连接命令超时中
					.publishOn(Schedulers.parallel()),
			exchange
	).doOnSuccess(accessTokenValue -> {
		if (accessTokenValue != null) {
			//accessToken续期,4小时
			tracedPublisherFactory.getTracedMono(redisTemplate.getExpire(token).publishOn(Schedulers.parallel()), exchange).doOnSuccess(expire -> {
				log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
				if (expire != null && expire.toHours() < 4) {
					redisTemplate.expire(token, Duration.ofHours(4)).subscribe();
				}
			}).subscribe();
		}
	})
	//必须转换成非 null,否则 flatmap 不会执行;也不能在末尾用 switchIfEmpty,因为整体返回的是 Mono<Void> 本来里面承载的就是空的,会导致每个请求发送两遍。
	.defaultIfEmpty("")
	.flatMap(accessTokenValue -> {
		try {
			if (StringUtils.isNotBlank(accessTokenValue)) {
				JSONObject accessToken = JSON.parseObject(accessTokenValue);
				String userId = accessToken.getString("userId");
				if (StringUtils.isNotBlank(userId)) {
					//解析 Token 
					HttpHeaders newHeaders = parse(accessToken);
					//继续请求
					return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
				}
			}
			return continueOrUnauthorized(path, exchange, chain, finalHeaders);
		} catch (Exception e) {
			log.error("read accessToken error: {}", e.getMessage(), e);
			return continueOrUnauthorized(path, exchange, chain, finalHeaders);
		}
	});
}

Вот несколько заметок:

  1. Spring-Cloud-Sleuth имеет приоритет для отслеживания ссылок в Spring-WebFlux.Если мы создадим новый Flux или Mono в Filter, информация о ссылке отсутствует, и нам нужно добавить ее вручную. Это может относиться к моей другой статье: Spring Cloud Gateway не имеет информации о ссылке, я глуп
  2. Комбинация spring-data-redis + пул соединений Lettuce, для асинхронного интерфейса нам лучше переключиться на другой пул потоков для выполнения после получения ответа, иначе последующие потоки все равно будут использовать потоки Redisson.Если это займет много времени , это повлияет на других людей, использующих бизнес Redis, и это время также учитывается в тайм-ауте команды подключения к Redis.
  3. Project Reactor Если промежуточный результат имеет нулевое значение, последующие потоковые операции, такие как плоская карта и карта, выполняться не будут. Если завершено здесь, ответ, полученный внешним интерфейсом, проблематичен. Таким образом, для промежуточных результатов мы должны рассматривать нулевую проблему на каждом шаге.
  4. Основной интерфейс GatewayFilter spring-cloud-gateway, основной метод возвращаетMono<Void>. Исходное содержимое Mono пусто, поэтому мы не можем использовать switchIfEmpty в конце, чтобы упростить null на среднем этапе.Если он используется, это приведет к тому, что каждый запрос будет отправлен дважды.

После этой модификации шлюз прошел стресс-тестирование, и при одноэкземплярном запросе 2w qps такой проблемы не было.

Ищите «My Programming Meow» в WeChat, подписывайтесь на официальный аккаунт, чистите каждый день, легко улучшайте свои технологии и получайте различные предложения.