Это отрывок из книги «Методология программирования на Java: Reactive Reactor3, Reactor-Netty и Spring WebFlux», которую я пишу, это«Методология программирования на Java: реактивный RxJava и разработка кода на практике»Продолжение книги, которую можно читать и как отдельную книгу.
Вот отрывок из первой половины этого раздела
Исследование HttpHandler
Через предыдущие главы мы уже коснулисьReactor-Netty
Детали проектирования и реализации всего процесса также включаютreactor.netty.http.server.HttpServer#handle
, если быть точным, этоSPI(Service Provider Interface)
интерфейс, предоставляемый извнеBiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler
, то что мы можемhandler
Реализуйте его в соответствии с вашей собственной средой.Spring WebFlux
иReactor-Netty
Каждый имеет свой собственный набор реализаций, но первый является для адаптацииSpring Web
Для некоторых привычек было сделано много адаптационного дизайна, и весь процесс более сложный, последний предоставляет набор простых и гибких реализаций. Затем в этой главе мыReactor-Netty
Внутреннее его осуществление начинается, формальноSpring WebFlux
переход.
Настройки HttpServerRoutes
Мы отправляем на внутренний серверHTTP
При запросе часто используетсяget
,head
,post
,put
Эти типы также будут включать адрес запроса.Сервер будет предоставлять соответствующую услугу в соответствии с типом запроса и адресом запроса, а затем будет выполняться конкретная обработка.Можем ли мы извлечь процесс поиска услуг для формирования поиска маршрутизации услуги .
Итак, вReactor-Netty
, разработалHttpServerRoutes
интерфейс, который наследуетBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
, используется для маршрутизации запроса.Когда приходит запрос, просматриваются разработанные нами правила маршрутизации по порядку, пока не совпадет первое, а затем вызывается соответствующая обработка.handler
.HttpServerRoutes
В интерфейсе нашего общегоget
,head
,post
,put
,delete
Дождитесь запроса на разработку соответствующих правил маршрутизации (подробности см. в исходном коде ниже).
Когда мы его используем, мы сначала вызовемHttpServerRoutes#newRoutes
Получить одинDefaultHttpServerRoutes
Пример, затем присоединяйтесь к нашему правилам маршрутизации дизайна, дизайн правил маршрутизации, фактически управляет правилом через коллекцию, а затем многосекретные совпадения, здесь это метод основной организацииreactor.netty.http.server.HttpServerRoutes#route
, после того, как правила разработаны, мы можем разработать соответствующие каждому правилуBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
Функциональная реализация, и, наконец, когда маршрут запроса успешно совпадает, мы можем вызвать нашBiFunction
Реализовать, обработать запрос.
//reactor.netty.http.server.HttpServerRoutes
public interface HttpServerRoutes extends
BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
static HttpServerRoutes newRoutes() {
return new DefaultHttpServerRoutes();
}
default HttpServerRoutes delete(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.delete(path), handler);
}
...
default HttpServerRoutes get(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.get(path), handler);
}
default HttpServerRoutes head(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.head(path), handler);
}
default HttpServerRoutes index(final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(INDEX_PREDICATE, handler);
}
default HttpServerRoutes options(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.options(path), handler);
}
default HttpServerRoutes post(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.post(path), handler);
}
default HttpServerRoutes put(String path,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
return route(HttpPredicate.put(path), handler);
}
HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler);
...
}
Что касается разработки правил маршрутизации, в сочетании с вышеизложенным, мы можемHttpServerRoutes
Создайте один в классе реализацииList
Используется для хранения раздела правила, следующим шагом является формулировка правил, которые можно поместить в раздел, поскольку это процесс добавления и не требуется возвращать значение, мы можем использоватьConsumer<? super HttpServerRoutes>
представлять этот процесс. Для сопоставления запроса часто используется условное суждение о запросе, тогда мы можем использоватьPredicate<? super HttpServerRequest>
для представления этой логики суждения, потому что единственное правило маршрутизации соответствует соответствующемуBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
обработки, то можем ли мы соединить их вместе, так чтоreactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
Он был разработан:
//reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
static final class HttpRouteHandler
implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,
Predicate<HttpServerRequest> {
final Predicate<? super HttpServerRequest> condition;
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
handler;
final Function<? super String, Map<String, String>> resolver;
HttpRouteHandler(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
@Nullable Function<? super String, Map<String, String>> resolver) {
this.condition = Objects.requireNonNull(condition, "condition");
this.handler = Objects.requireNonNull(handler, "handler");
this.resolver = resolver;
}
@Override
public Publisher<Void> apply(HttpServerRequest request,
HttpServerResponse response) {
return handler.apply(request.paramsResolver(resolver), response);
}
@Override
public boolean test(HttpServerRequest o) {
return condition.test(o);
}
}
Может быть необходимоrequest
Парсинг параметров он предоставляет альтернативу за пределами нашего пользовательского анализатора, который реализует параметры интерфейса:Function<? super String, Map<String, String>>
,остатокcondition
иresolver
Просто следуйте логике, которую мы сказали ранее.
В настоящее время,HttpRouteHandler
Он принадлежит реальному верификатору запросов и бизнес-процессору запросов. Теперь нам нужно соединить их функции, чтобы сформировать поток обработки через ряд логики. Затем мы можем использовать здесь режим прокси.HttpServerRoutes
в классе реализации черезList
Коллекция управляет переменным количествомHttpRouteHandler
Экземпляр, внешне мы используемreactor.netty.http.server.HttpServer#handle
видеть только одинBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
Реализация, следовательно, вся обработка логического потока должна быть в этомBiFunction
изapply(...)
реализации, поэтому имеем следующееreactor.netty.http.server.DefaultHttpServerRoutes
выполнить:
//reactor.netty.http.server.DefaultHttpServerRoutes
final class DefaultHttpServerRoutes implements HttpServerRoutes {
private final CopyOnWriteArrayList<HttpRouteHandler> handlers =
new CopyOnWriteArrayList<>();
...
@Override
public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(condition, "condition");
Objects.requireNonNull(handler, "handler");
if (condition instanceof HttpPredicate) {
handlers.add(new HttpRouteHandler(condition,
handler,
(HttpPredicate) condition));
}
else {
handlers.add(new HttpRouteHandler(condition, handler, null));
}
return this;
}
@Override
public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
final Iterator<HttpRouteHandler> iterator = handlers.iterator();
HttpRouteHandler cursor;
try {
while (iterator.hasNext()) {
cursor = iterator.next();
if (cursor.test(request)) {
return cursor.apply(request, response);
}
}
}
catch (Throwable t) {
Exceptions.throwIfJvmFatal(t);
return Mono.error(t); //500
}
return response.sendNotFound();
}
...
}
можно увидетьroute(...)
Метод только что сделалHttpRouteHandler
Экземпляры построены и переданыhandlers
этоlist
Управляется вышеуказаннымapply
Реализация объединяет предыдущий контент в логике процесса. Поэтому мы можемreactor.netty.http.server.HttpServer
дизайн один вroute
метод, обеспечивающийSPI
Интерфейс, определите весь процесс, который мы упомянули в этом методе (получите одинHttpServerRoutes
Экземпляр, затем передать егоroute
Правила построения метода, процесс построения указан вышеConsumer<? super HttpServerRoutes>
в процессе, и, наконец, объединит успешныеHttpServerRoutes
отBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>
Роль передается как параметрHttpServer#handle
).
Кроме того, мы должны уделять особое внимание здесь, в приведенном вышеDefaultHttpServerRoutes
осуществленныйapply
В методе видно, что как только запрос совпадет, результат будет возвращен сразу после обработки, и совпадение не будет продолжаться.
//reactor.netty.http.server.HttpServer#route
public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
Objects.requireNonNull(routesBuilder, "routeBuilder");
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
routesBuilder.accept(routes);
return handle(routes);
}
Таким образом, мы можем использовать следующиеDemo
Чтобы применить приведенный выше дизайн:
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes ->
routes.get("/hello", <1>
(request, response) -> response.sendString(Mono.just("Hello World!")))
.post("/echo", <2>
(request, response) -> response.send(request.receive().retain()))
.get("/path/{param}", <3>
(request, response) -> response.sendString(Mono.just(request.param("param")))))
.bindNow();
server.onDispose()
.block();
}
}
существует<1>
, когда мы выдаемGET
Запрос на посещение/hello
Когда вы получаете строкуHello World!
.
существует<2>
, когда мы выдаемPOST
просьба посетить/echo
Тело запроса возвращается как содержимое ответа.
существует<3>
, когда мы выдаемGET
просьба посетить/path/{param}
Когда вы получаете параметр пути запросаparam
значение .
оSSE
Для использования здесь мы можем посмотреть следующую демонстрацию, конкретные детали кода не будут подробно описаны, просто посмотрите соответствующие комментарии:
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes -> routes.get("/sse", serveSse()))
.bindNow();
server.onDispose()
.block();
}
/**
* 准备 SSE response
* 参考 reactor.netty.http.server.HttpServerResponse#sse可以知道它的"Content-Type"
* 是"text/event-stream"
* flush策略为通过所提供的Publisher来每下发一个元素就flush一次
*/
private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
return (request, response) ->
response.sse()
.send(flux.map(Application::toByteBuf), b -> true);
}
/**
* 将发元素按照按照给定的格式由Object转换为ByteBuf。
*/
private static ByteBuf toByteBuf(Object any) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
out.write("data: ".getBytes(Charset.defaultCharset()));
MAPPER.writeValue(out, any);
out.write("\n\n".getBytes(Charset.defaultCharset()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
return ByteBufAllocator.DEFAULT
.buffer()
.writeBytes(out.toByteArray());
}
private static final ObjectMapper MAPPER = new ObjectMapper();
}