[Наступление на яму] Грациозное завершение работы Spring Webflux

Spring

задний план

  • Недавно я ел крабовый вебфлюкс и обнаружил, что, хотя документ будет корректно закрываться, он не ждет возврата всех запросов перед закрытием.Если есть незавершенные запросы (например, запросы на сон 10 с), он будет напрямуюEmpty reply(Среда: Spring Boot 2.1.5 с Reactor-Netty 0.8.8)

2020.02.11 Весна 2.2.4 Обновление

  • Обновление до Spring 2.2.4 показало, что предыдущая ошибка была исправлена ​​вreactorResourceFactoryПри выключении блока закрывает пул ресурсов netty.
  • Но проверено, и обнаружено, что проблема появилась снова, и она появится снова.Emtpy reply. Ожидается, что бизнес-ответ не будет завершен до отключения всех ссылок.
  • Поскольку я мало что знаю о netty, я проверил много информации и обнаружил, что корректное отключение netty гарантирует только отсутствие пакетов, которые не могут быть завершены в баффе сокетов, и не гарантирует отсутствие канала по-прежнему. выполняется на рабочем, и его тихое время только закрыто.Затем подождите некоторое время, прежде чем на самом деле убить основную программу.
  • Что же тогда делать? Я нашел хорошую идею из Интернета.Можно добавить текущий счетчик запросов в WebFilter, а затем заблокировать и дождаться закрытия перед закрытием. Компонент WebFilter блокирует выключение ResourceFactory из-за проблем с порядком компонентов
  • Если сначала необходимо закрыть порт (например, верхний уровень — это обнаружение сердцебиения TCP, и вводить новый трафик не требуется), вызовитеreactiveWebServerApplicationContextКонтекст может быть закрыт веб-сервером
  • Конкретный код выглядит следующим образом
//inspired by https://github.com/making/demo-graceful-shutdown-webflux/blob/master/src/main/java/com/example/demogracefulshutdownwebflux/DrainFilter.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@Slf4j
public class DrainDownFilter implements WebFilter {

	private final AtomicInteger inFlight = new AtomicInteger(0);

	private final AtomicBoolean inDraining = new AtomicBoolean(false);

	@Autowired
	ReactiveWebServerApplicationContext reactiveWebServerApplicationContext;

	@Override
	public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {

		return webFilterChain.filter(serverWebExchange).doFirst(inFlight::incrementAndGet)
				.doFinally((x) -> inFlight.decrementAndGet());
	}

	@PreDestroy
	void preDestroy() {
		log.warn("Start draining. ({} in-flight requests)", this.inFlight);
		this.inDraining.set(true);
		// 先关闭服务器
		reactiveWebServerApplicationContext.getWebServer().stop();
		// 轮询 30s
		for (int i = 0; i < 30; i++) {
			int current = this.inFlight.get();
			if (current <= 0) {
				break;
			}
			try {
				log.warn("Draining... ({} in-flight requests)", current);
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				// Thread.currentThread().interrupt();
			}
		}
		log.warn("Good bye. ({} in-flight requests)", this.inFlight);
	}

}

Основная причина и решение(истекший)

  • Пропустить анализ, а не таблицу, и перейти сразу к выводу
  • Хотя у самой netty есть изящное завершение работы, и она действительно вызывается при выключении, способ, которым netty вызывается реактором, выглядит следующим образом.
//reactor.netty.resources.LoopResources#dispose
@Override
default void dispose() {
	//noop default
	disposeLater().subscribe();
}
  • Вызовите подписку напрямую, и нет блокировки для завершения, поэтому, если последующее весеннее завершение работы, этот процесс будет прерван напрямую, что приведет к указанной выше проблеме.
  • Итак, как решить эту проблему?
    • Поскольку я не знаком с набором webflux, я не знаю, как дождаться завершения всех подписок, поэтому я применил относительно хакерский метод: после запуска я получил внутренние HttpResources через отражение, а затем зарегистрировал хук отключения , и напрямую вызывается метод блокировки, блокирующий Release LoopResources, так что все будет в порядке
/**
 * @author Lambda.J
 * @version $Id: GracefulShutdown.java, v 0.1 2019-05-27 
 */
@Component
public class GracefulShutdown {
    @Autowired
    ReactorResourceFactory reactorResourceFactory;

    LoopResources loopResources;

    // SpringBoot 2.1.5 reactor.netty.resources.LoopResources#dispose 只 subscribe 没有 block 造成没有等待关闭,这边手工调用,后面如果修复了直接删除就好
    @PostConstruct
    void init() throws Exception {
        Field field = TcpResources.class.getDeclaredField("defaultLoops");
        field.setAccessible(true);
        loopResources = (LoopResources) field.get(reactorResourceFactory.getLoopResources());
        field.setAccessible(false);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Graceful: block long to 20s before real shutdown!");
            loopResources.disposeLater().block(Duration.ofSeconds(20));
        }));
    }
}

Связанные знания

Процесс весеннего закрытия

  1. Перехватчик выключения регистрируется при запускеorg.springframework.context.support.AbstractApplicationContext#registerShutdownHook
  2. Истинный процесс закрытияorg.springframework.context.support.AbstractApplicationContext#doClose
    1. близкий контекст
    2. закрыть компонент жизненного цикла
    3. Закройте одноэлементный компонент, сгенерированный beanFactory (здесь закрывается NettyServer)
    4. закрыть BeanFactory
    5. Закрыть DisposableServer
    6. Удалите прослушиватель и установите неактивное состояние.
  3. Тест показал, что во время выключения бина, когдаreactorServerResourceFactoryПосле закрытия (org.springframework.http.client.reactive.ReactorResourceFactory#destroy), порт закрыт, но запросы, на которые еще не ответили, могут продолжать отвечать.
    • Следовательно, есть также хакерский способ использовать переопределение класса, чтобы переписать здесь логику выключения и подождать некоторое время после выключения реактораServer. Но слишком хакер, не используйте его в качестве крайней меры

использованная литература