Изучение HttpHandler в Spring WebFlux

Java

Это отрывок из книги «Методология программирования на Java: Reactive Reactor3, Reactor-Netty и Spring WebFlux», которую я пишу, это«Методология программирования на Java: реактивный RxJava и разработка кода на практике»Продолжение книги, которую можно читать и как отдельную книгу.

Вот отрывок из первой половины этого раздела

Исследование HttpHandler

Через предыдущие главы мы уже коснулисьReactor-NettyДетали проектирования и реализации всего процесса также включаютreactor.netty.http.server.HttpServer#handle, если быть точным, этоSPI(Service Provider Interface)интерфейс, предоставляемый извнеBiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler, то что мы можемhandlerРеализуйте его в соответствии с вашей собственной средой.Spring WebFluxиReactor-NettyКаждый имеет свой собственный набор реализаций, но первый является для адаптацииSpring WebДля некоторых привычек было сделано много адаптационного дизайна, и весь процесс более сложный, последний предоставляет набор простых и гибких реализаций. Затем в этой главе мыReactor-NettyВнутреннее его осуществление начинается, формальноSpring WebFluxпереход.

Настройки HttpServerRoutes

Мы отправляем на внутренний серверHTTPПри запросе часто используетсяget,head,post,putЭти типы также будут включать адрес запроса.Сервер будет предоставлять соответствующую услугу в соответствии с типом запроса и адресом запроса, а затем будет выполняться конкретная обработка.Можем ли мы извлечь процесс поиска услуг для формирования поиска маршрутизации услуги .

Итак, вReactor-Netty, разработалHttpServerRoutesинтерфейс, который наследуетBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>, используется для маршрутизации запроса.Когда приходит запрос, просматриваются разработанные нами правила маршрутизации по порядку, пока не совпадет первое, а затем вызывается соответствующая обработка.handler.HttpServerRoutesВ интерфейсе нашего общегоget,head,post,put,deleteДождитесь запроса на разработку соответствующих правил маршрутизации (подробности см. в исходном коде ниже).

Когда мы его используем, мы сначала вызовемHttpServerRoutes#newRoutesПолучить одинDefaultHttpServerRoutesПример, затем присоединяйтесь к нашему правилам маршрутизации дизайна, дизайн правил маршрутизации, фактически управляет правилом через коллекцию, а затем многосекретные совпадения, здесь это метод основной организацииreactor.netty.http.server.HttpServerRoutes#route, после того, как правила разработаны, мы можем разработать соответствующие каждому правилуBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>Функциональная реализация, и, наконец, когда маршрут запроса успешно совпадает, мы можем вызвать нашBiFunctionРеализовать, обработать запрос.

//reactor.netty.http.server.HttpServerRoutes
public interface HttpServerRoutes extends
                                  BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {

	static HttpServerRoutes newRoutes() {
		return new DefaultHttpServerRoutes();
	}


	default HttpServerRoutes delete(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.delete(path), handler);
	}

    ...

	default HttpServerRoutes get(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.get(path), handler);
	}

	default HttpServerRoutes head(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.head(path), handler);
	}

	default HttpServerRoutes index(final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(INDEX_PREDICATE, handler);
	}

	default HttpServerRoutes options(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.options(path), handler);
	}

	default HttpServerRoutes post(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.post(path), handler);
	}

	default HttpServerRoutes put(String path,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.put(path), handler);
	}

	HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler);

	...

}

Что касается разработки правил маршрутизации, в сочетании с вышеизложенным, мы можемHttpServerRoutesСоздайте один в классе реализацииListИспользуется для хранения раздела правила, следующим шагом является формулировка правил, которые можно поместить в раздел, поскольку это процесс добавления и не требуется возвращать значение, мы можем использоватьConsumer<? super HttpServerRoutes>представлять этот процесс. Для сопоставления запроса часто используется условное суждение о запросе, тогда мы можем использоватьPredicate<? super HttpServerRequest>для представления этой логики суждения, потому что единственное правило маршрутизации соответствует соответствующемуBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>обработки, то можем ли мы соединить их вместе, так чтоreactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandlerОн был разработан:

//reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
static final class HttpRouteHandler
        implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,
                    Predicate<HttpServerRequest> {

    final Predicate<? super HttpServerRequest>          condition;
    final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
                                                        handler;
    final Function<? super String, Map<String, String>> resolver;

    HttpRouteHandler(Predicate<? super HttpServerRequest> condition,
            BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
            @Nullable Function<? super String, Map<String, String>> resolver) {
        this.condition = Objects.requireNonNull(condition, "condition");
        this.handler = Objects.requireNonNull(handler, "handler");
        this.resolver = resolver;
    }

    @Override
    public Publisher<Void> apply(HttpServerRequest request,
            HttpServerResponse response) {
        return handler.apply(request.paramsResolver(resolver), response);
    }

    @Override
    public boolean test(HttpServerRequest o) {
        return condition.test(o);
    }
}

Может быть необходимоrequestПарсинг параметров он предоставляет альтернативу за пределами нашего пользовательского анализатора, который реализует параметры интерфейса:Function<? super String, Map<String, String>>,остатокconditionиresolverПросто следуйте логике, которую мы сказали ранее.

В настоящее время,HttpRouteHandlerОн принадлежит реальному верификатору запросов и бизнес-процессору запросов. Теперь нам нужно соединить их функции, чтобы сформировать поток обработки через ряд логики. Затем мы можем использовать здесь режим прокси.HttpServerRoutesв классе реализации черезListКоллекция управляет переменным количествомHttpRouteHandlerЭкземпляр, внешне мы используемreactor.netty.http.server.HttpServer#handleвидеть только одинBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>Реализация, следовательно, вся обработка логического потока должна быть в этомBiFunctionизapply(...)реализации, поэтому имеем следующееreactor.netty.http.server.DefaultHttpServerRoutesвыполнить:

//reactor.netty.http.server.DefaultHttpServerRoutes
final class DefaultHttpServerRoutes implements HttpServerRoutes {


	private final CopyOnWriteArrayList<HttpRouteHandler> handlers =
			new CopyOnWriteArrayList<>();
	...
	@Override
	public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(condition, "condition");
		Objects.requireNonNull(handler, "handler");

		if (condition instanceof HttpPredicate) {
			handlers.add(new HttpRouteHandler(condition,
					handler,
					(HttpPredicate) condition));
		}
		else {
			handlers.add(new HttpRouteHandler(condition, handler, null));
		}
		return this;
	}

	@Override
	public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
		final Iterator<HttpRouteHandler> iterator = handlers.iterator();
		HttpRouteHandler cursor;

		try {
			while (iterator.hasNext()) {
				cursor = iterator.next();
				if (cursor.test(request)) {
					return cursor.apply(request, response);
				}
			}
		}
		catch (Throwable t) {
			Exceptions.throwIfJvmFatal(t);
			return Mono.error(t); //500
		}

		return response.sendNotFound();
	}
    ...
}

можно увидетьroute(...)Метод только что сделалHttpRouteHandlerЭкземпляры построены и переданыhandlersэтоlistУправляется вышеуказаннымapplyРеализация объединяет предыдущий контент в логике процесса. Поэтому мы можемreactor.netty.http.server.HttpServerдизайн один вrouteметод, обеспечивающийSPIИнтерфейс, определите весь процесс, который мы упомянули в этом методе (получите одинHttpServerRoutesЭкземпляр, затем передать егоrouteПравила построения метода, процесс построения указан вышеConsumer<? super HttpServerRoutes>в процессе, и, наконец, объединит успешныеHttpServerRoutesотBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>Роль передается как параметрHttpServer#handle).

Кроме того, мы должны уделять особое внимание здесь, в приведенном вышеDefaultHttpServerRoutesосуществленныйapplyВ методе видно, что как только запрос совпадет, результат будет возвращен сразу после обработки, и совпадение не будет продолжаться.

//reactor.netty.http.server.HttpServer#route
public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
    Objects.requireNonNull(routesBuilder, "routeBuilder");
    HttpServerRoutes routes = HttpServerRoutes.newRoutes();
    routesBuilder.accept(routes);
    return handle(routes);
}

Таким образом, мы можем использовать следующиеDemoЧтобы применить приведенный выше дизайн:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes ->
                              routes.get("/hello",        <1>
                                         (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",        <2>
                                         (request, response) -> response.send(request.receive().retain()))
                                    .get("/path/{param}", <3>
                                         (request, response) -> response.sendString(Mono.just(request.param("param")))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

существует<1>, когда мы выдаемGETЗапрос на посещение/helloКогда вы получаете строкуHello World!.

существует<2>, когда мы выдаемPOSTпросьба посетить/echoТело запроса возвращается как содержимое ответа.

существует<3>, когда мы выдаемGETпросьба посетить/path/{param}Когда вы получаете параметр пути запросаparamзначение .

оSSEДля использования здесь мы можем посмотреть следующую демонстрацию, конкретные детали кода не будут подробно описаны, просто посмотрите соответствующие комментарии:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes -> routes.get("/sse", serveSse()))
                          .bindNow();

        server.onDispose()
              .block();
    }

    /**
     * 准备 SSE response
     * 参考 reactor.netty.http.server.HttpServerResponse#sse可以知道它的"Content-Type" 
     * 是"text/event-stream"
     * flush策略为通过所提供的Publisher来每下发一个元素就flush一次
     */
    private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
        return (request, response) ->
            response.sse()
                    .send(flux.map(Application::toByteBuf), b -> true);
    }

    /**
     * 将发元素按照按照给定的格式由Object转换为ByteBuf。
     */
    private static ByteBuf toByteBuf(Object any) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            out.write("data: ".getBytes(Charset.defaultCharset()));
            MAPPER.writeValue(out, any);
            out.write("\n\n".getBytes(Charset.defaultCharset()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return ByteBufAllocator.DEFAULT
                               .buffer()
                               .writeBytes(out.toByteArray());
    }

    private static final ObjectMapper MAPPER = new ObjectMapper();
}