Недавно я ел крабовый вебфлюкс и обнаружил, что, хотя документ будет корректно закрываться, он не ждет возврата всех запросов перед закрытием.Если есть незавершенные запросы (например, запросы на сон 10 с), он будет напрямуюEmpty reply(Среда: Spring Boot 2.1.5 с Reactor-Netty 0.8.8)
2020.02.11 Весна 2.2.4 Обновление
Обновление до Spring 2.2.4 показало, что предыдущая ошибка была исправлена вreactorResourceFactoryПри выключении блока закрывает пул ресурсов netty.
Но проверено, и обнаружено, что проблема появилась снова, и она появится снова.Emtpy reply. Ожидается, что бизнес-ответ не будет завершен до отключения всех ссылок.
Поскольку я мало что знаю о netty, я проверил много информации и обнаружил, что корректное отключение netty гарантирует только отсутствие пакетов, которые не могут быть завершены в баффе сокетов, и не гарантирует отсутствие канала по-прежнему. выполняется на рабочем, и его тихое время только закрыто.Затем подождите некоторое время, прежде чем на самом деле убить основную программу.
Что же тогда делать? Я нашел хорошую идею из Интернета.Можно добавить текущий счетчик запросов в WebFilter, а затем заблокировать и дождаться закрытия перед закрытием. Компонент WebFilter блокирует выключение ResourceFactory из-за проблем с порядком компонентов
Если сначала необходимо закрыть порт (например, верхний уровень — это обнаружение сердцебиения TCP, и вводить новый трафик не требуется), вызовитеreactiveWebServerApplicationContextКонтекст может быть закрыт веб-сервером
Конкретный код выглядит следующим образом
//inspired by https://github.com/making/demo-graceful-shutdown-webflux/blob/master/src/main/java/com/example/demogracefulshutdownwebflux/DrainFilter.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
public class DrainDownFilter implements WebFilter {
private final AtomicInteger inFlight = new AtomicInteger(0);
private final AtomicBoolean inDraining = new AtomicBoolean(false);
@Autowired
ReactiveWebServerApplicationContext reactiveWebServerApplicationContext;
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
return webFilterChain.filter(serverWebExchange).doFirst(inFlight::incrementAndGet)
.doFinally((x) -> inFlight.decrementAndGet());
}
@PreDestroy
void preDestroy() {
log.warn("Start draining. ({} in-flight requests)", this.inFlight);
this.inDraining.set(true);
// 先关闭服务器
reactiveWebServerApplicationContext.getWebServer().stop();
// 轮询 30s
for (int i = 0; i < 30; i++) {
int current = this.inFlight.get();
if (current <= 0) {
break;
}
try {
log.warn("Draining... ({} in-flight requests)", current);
Thread.sleep(1000);
}
catch (InterruptedException e) {
// Thread.currentThread().interrupt();
}
}
log.warn("Good bye. ({} in-flight requests)", this.inFlight);
}
}
Основная причина и решение(истекший)
Пропустить анализ, а не таблицу, и перейти сразу к выводу
Хотя у самой netty есть изящное завершение работы, и она действительно вызывается при выключении, способ, которым netty вызывается реактором, выглядит следующим образом.
Вызовите подписку напрямую, и нет блокировки для завершения, поэтому, если последующее весеннее завершение работы, этот процесс будет прерван напрямую, что приведет к указанной выше проблеме.
Итак, как решить эту проблему?
Поскольку я не знаком с набором webflux, я не знаю, как дождаться завершения всех подписок, поэтому я применил относительно хакерский метод: после запуска я получил внутренние HttpResources через отражение, а затем зарегистрировал хук отключения , и напрямую вызывается метод блокировки, блокирующий Release LoopResources, так что все будет в порядке
/**
* @author Lambda.J
* @version $Id: GracefulShutdown.java, v 0.1 2019-05-27
*/
@Component
public class GracefulShutdown {
@Autowired
ReactorResourceFactory reactorResourceFactory;
LoopResources loopResources;
// SpringBoot 2.1.5 reactor.netty.resources.LoopResources#dispose 只 subscribe 没有 block 造成没有等待关闭,这边手工调用,后面如果修复了直接删除就好
@PostConstruct
void init() throws Exception {
Field field = TcpResources.class.getDeclaredField("defaultLoops");
field.setAccessible(true);
loopResources = (LoopResources) field.get(reactorResourceFactory.getLoopResources());
field.setAccessible(false);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Graceful: block long to 20s before real shutdown!");
loopResources.disposeLater().block(Duration.ofSeconds(20));
}));
}
}
Связанные знания
Процесс весеннего закрытия
Перехватчик выключения регистрируется при запускеorg.springframework.context.support.AbstractApplicationContext#registerShutdownHook
Истинный процесс закрытияorg.springframework.context.support.AbstractApplicationContext#doClose
Удалите прослушиватель и установите неактивное состояние.
Тест показал, что во время выключения бина, когдаreactorServerResourceFactoryПосле закрытия (org.springframework.http.client.reactive.ReactorResourceFactory#destroy), порт закрыт, но запросы, на которые еще не ответили, могут продолжать отвечать.
Следовательно, есть также хакерский способ использовать переопределение класса, чтобы переписать здесь логику выключения и подождать некоторое время после выключения реактораServer. Но слишком хакер, не используйте его в качестве крайней меры