предисловие
Только лысая голова может стать сильнее.
Текст был включен в мой репозиторий GitHub, добро пожаловать, звезда:GitHub.com/Zhongf UC очень…
Структура очков знаний в этой статье:
Если есть общественность, касается меня, номер статьи, вы найдете, я найду бы, я недавно отправил время от времени на одном из лучших статьи Webflux, потому что я недавно узнал.
Как я уже говорил, прежде чем изучать технологию, вам нужно понять, почему вы хотите ее изучить. На самом деле это исследованиеWebFlux
Оригинальной мотивации не так много, в основном потому, что наша группа будет по очереди обмениваться техническими данными, а я не знаю, чем поделиться...
Я изучал большие данные для начинающих и раньше, но временная шкала этого раздела будет длиннее, и я чувствую, что не могу поделиться этим в группе (и большинство студентов в группе понимают большие данные, я единственный тот, у кого слезы на глазах). Итак, мысль такая: «Почему бы мне не узнать что-то новое и не сделать это?». Так что потребовалось некоторое время, чтобы узнатьWebFlux
Ла~
В этой статье в основном объясняется, что такоеWebFlux
, подведите вас к двери, надеюсь, это будет вам полезно (по крайней мере, прочитав эту статью, вы знаете, для чего используется WebFlux)
1. Что такое WebFlux?
мы начинаем сSpring
Официальный сайт немного опускается, чтобы увидеть введениеWebFlux
место
Какую информацию мы можем вывести из профиля на официальном сайте?
- Мы, программисты, частоВыбирайте разные технологии в соответствии с разными сценариями применения,有的场景适合用于同步阻塞的,有的场景适合用于异步非阻塞的。 и
Spring5
обеспечивает полный комплектОтзывчивый(Неблокирующий) технологический стек для использования (включая веб-контроллер, контроль разрешений, уровень доступа к данным и т. д.). - На картинке слева показано сравнение стека технологий;
- Тип ответа обычно использует контейнер Netty или Servlet 3.1 (поскольку он поддерживает асинхронную неблокировку), а стек технологии Servlet использует контейнер Servlet.
- В конце сети, используя отзывчивый WebFlux, сервлете с помощью SpringMVC
- .....
Подводя итог, WebFlux — это только часть реактивного программирования (на стороне веб-управления), поэтому обычно мы используем его для сравнения со SpringMVC.
2. Как понять реактивное программирование?
упомянутый вышереактивное программирование(реактивное программирование), а WebFlux — это лишь один из технологических стеков реактивного программирования, поэтому давайте сначала обсудим, что такое реактивное программирование.
Из Википедии получаем определение:
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
Реактивное программирование — это парадигма декларативного программирования, основанная на потоке данных и распространении изменений.
В Википедии также есть небольшой пример:
Вероятно, это следующее:
- В императивном программировании (нашем повседневном режиме программирования) формула
a=b+c
, что обозначаетa
Значение определяетсяb
иc
вычислено. еслиb
илиc
Последующие изменения,это не влияетприбытьa
значение - При реактивном программировании формула
a:=b+c
, что обозначаетa
Значение определяетсяb
иc
вычислено. но еслиb
илиc
Значение впоследствии изменяется,повлияетприбытьa
значение
Я думаю, что приведенный выше пример уже помогает нам понятьраспространение изменений
Как понять поток данных и декларативный? Это может упомянуть поток нашего потока. Я написал статью о лямбда-выражениях и потоковом потоке, вы можете посмотреть:
Синтаксис Lambda выглядит следующим образом (использование потока Stream будет включать множество выражений Lambda, поэтому обычно сначала изучают Lambda, а затем поток Stream):
Использование потоков Stream делится на три этапа (создание потоков Stream, выполнение промежуточных операций и выполнение конечных операций):
Выполнение промежуточных операций на самом деле дает нам многоAPIДля управления потоком данных Stream (Суммирование/дедупликация/фильтрация)и т.д
Сказав так много, как вы понимаете поток данных и декларативный подход? На самом деле это:
- Сначала мы сами обрабатывали данные, но позжеОбработанные данные абстрагируются(превратился в поток данных), тоОбработать через APIДанные в потоке данных (является декларативным)
Например, следующий код; данные в массиве становятсяпоток данных, вызываемый явным объявлением.sum()
для обработки данных в потоке данных, чтобы получить окончательный результат:
public static void main(String[] args) {
int[] nums = { 1, 2, 3 };
int sum2 = IntStream.of(nums).parallel().sum();
System.out.println("结果为:" + sum2);
}
Как показано ниже:
Реактивное программирование 2.1 -> неблокирующий асинхронный
Выше упоминалось, что такое реактивное программирование:
Реактивное программирование — это парадигма декларативного программирования, основанная на потоке данных и распространении изменений.
Это также объясняет, что означает поток данных/доставка изменений/декларативность, но когда дело доходит до реактивного программирования, это неотделимоАсинхронный неблокирующий.
Мы можем найти информацию о WebFlux на официальном сайте Spring.asynchronous, nonblocking
такие слова, как это, потому чтоРеактивное программирование асинхронно, также можно понимать какизменить переводОн выполняется асинхронно.
Как показано ниже,На общую сумму повлияют другие суммы(Процесс обновления асинхронный):
Наш поток JDK8 Stream является синхронным, он не подходит для реактивного программирования (но нужно понимать основное использование, потому что реактивное потоковое программирование — это все операциипотокЧто ж)
В JDK9 реактивные потоки уже поддерживаются, давайте посмотрим
3. Реактивный JDK9
Спецификация реактивных потоков уже была предложена: в ней упоминается:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ----->www.reactive-streams.org/
Переведите и добавьте немного информации:
Reactive Streams предоставляет способ достижения асинхронной неблокировки путем определения набора сущностей, интерфейсов и методов взаимодействия.обратное давлениестандарт. Третьи стороны следуют этому стандарту для реализации конкретных решений, таких как Reactor, RxJava, Akka Streams, Ratpack и т. д.
Спецификация фактически определяет четыре интерфейса:
Платформа Java не обеспечивала полной поддержки Reactive до JDK 9. JDK9 также определяет четыре упомянутых выше интерфейса.java.util.concurrent
на сумке
Общая архитектура потоковой обработки обычно выглядит так (Производитель генерирует данные, выполняет промежуточную обработку данных, а потребитель получает данные для потребления.):
- Источник данных, обычно называемый производителем (Producer)
- Место назначения данных, обычно называемое потребителем (Consumer)
- Во время обработки над данными выполняется некоторая операция.Один или несколько этапов обработки. (Процессор)
Здесь мы оглядываемся назад на интерфейс реактивного потока, и мы должны быть в состоянии понять:
- Publisher (издатель) эквивалентно продюсеру (Producer)
- Подписчик (subscriber) эквивалентен потребителю (Consumer)
- Процессор используется для обработки данных между издателями и подписчиками.
Понятие противодавления (противодавления) упоминается в ответном потоке, что на самом деле очень легко понять. Реализация асинхронной неблокировки в реактивных потоках основана на модели производителя и потребителя, и проблема, которая может возникнуть в производителе и потребителе, заключается в следующем:Производитель производит слишком много данных, а потребитель перегружен.
И обратное давление выразилось прямо:Потребители могут сообщить производителю, сколько данных им нужно. вотSubscriptionчто делает интерфейс.
Давайте взглянем на методы интерфейса JDK9, возможно, мы сможем лучше понять приведенные выше слова:
// 发布者(生产者)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
// 订阅者(消费者)
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
// 用于发布者与订阅者之间的通信(实现背压:订阅者能够告诉生产者需要多少数据)
public interface Subscription {
public void request(long n);
public void cancel();
}
// 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
3.1 Посмотрите на пример
Комментариев в коде много, поэтому ББ у меня не много, рекомендуется копировать и запускать напрямую:
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
Результат выглядит следующим образом:
Процесс на самом деле очень простой:
Использованная литература:
- Yanbin.blog/Java-9-talk…
- blog.CSDN.net/Dance — это микс…
- woohoo.spring4all.com/article/682…
- Блог Woohoo.cn на.com/ICA n fix IT/Боюсь…
Stream в Java 8 в основном фокусируется на фильтрации, сопоставлении и слиянии потоков, в то время как Reactive Stream делает еще один шаг вперед, сосредотачиваясь на генерации и потреблении потоков, то есть на координации потоков между производителями и потребителями.
Грубо говоря:Responsive Flow — это неблокирующее асинхронное управление потоком +(Вы можете сообщить продюсеру, какой объем/отказ от подписки вам нужен)
С нетерпением ждем сценария применения реактивного программирования:
Например, в системе мониторинга логов нашей странице интерфейса больше не нужно будет постоянно запрашивать данные с сервера через «императивный» опрос, а затем обновлять их, вместо этого после установления канала поток данных будет непрерывно течь из систему на страницу, чтобы показать кривую изменения индикатора в реальном времени;
Другой пример — социальная платформа: обновления, лайки и комментарии друзей не пролистываются вручную, а автоматически отражаются в интерфейсе при изменении фоновых данных.
4. Начало работы с WebFlux
После долгих попыток я наконец вернулся к WebFlux. На основании вышеизложенного теперь мы можем сделать некоторые выводы:
- WebFlux - это часть пружины запускает реагирование на программирование (веб-конец)
- Реактивное программирование является асинхронным и неблокирующим (парадигма декларативного программирования, основанная на потоках данных и распространении изменений).
Вернемся и посмотрим на картинки на официальном сайте:
4.1 Простой опыт работы с WebFlux
Официальная весна, чтобы сделать нас большебыстро/гладкоВ WebFlux набор SpringMVC уже поддерживался. Это:Мы можем использовать WebFlux так же, как SpringMVC..
Реактивный поток, используемый WebFlux, использует не платформу JDK9, а метод, называемыйReactorРеактивная потоковая библиотека. так,начинаяWebFlux на самом деле больше связан с пониманием того, как использовать Reactor API, давайте посмотрим~
Reactor — это реактивный поток, у него тоже есть соответствующий издатель (Publisher
), издатели Reactor представлены двумя классами:
- Mono(возвращает 0 или 1 элемент)
- Flux(возвращает 0-n элементов)
А подписчики естьВесенний фреймворкзаканчивать
Давайте рассмотрим простой пример (построенный в среде WebFlux):
// 阻塞5秒钟
private String createStr() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
return "some string";
}
// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {
log.info("get1 start");
String result = createStr();
log.info("get1 end.");
return result;
}
// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {
log.info("get2 start");
Mono<String> result = Mono.fromSupplier(() -> createStr());
log.info("get2 end.");
return result;
}
В первую очередь стоит отметить, что когда мы собираем окружение WebFlux и запускаем, сервером приложений по умолчанию является Netty:
Давайте посетим интерфейс SpringMVC и интерфейс WebFlux соответственно и посмотрите, в чем разница:
СпрингМВК:
Вебфлукс:
С точки зрения вызывающей стороны (браузера) невозможно воспринять какие-либо изменения, потому что все они должны ждать 5 с, прежде чем возвращать данные. Однако из журнала на стороне сервера мы видим, что WebFluxВозвращает объект Mono напрямую(Вместо синхронной блокировки на 5 секунд, как SpringMVC, поток возвращается).
Именно в этом преимущество WebFlux: возможностьФиксированные потоки для обработки высокой параллелизма(чтобы полностью раскрыть производительность машины).
WebFlux также поддерживаетпуш сервера(SSE -> Server Send Event), давайте посмотрим на пример:
/**
* Flux : 返回0-n个元素
* 注:需要指定MediaType
* @return
*/
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
Flux<String> result = Flux
.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "flux data--" + i;
}));
return result;
}
эффектв секундуОтправит данные в браузер:
Большое спасиботалантМы можем видеть здесь, если эта статья хорошо написана, я чувствую себя "три криво"что-тоеслипопросить лайк Пожалуйста, следите за ️ поделитесь пожалуйста 👥 Спросите сообщение 💬Это правда для меняочень полезно! ! !
Я еще не закончил писать WebFlux.Эта статья о том, как WebFlux поддерживает набор аннотаций SpringMVC для разработки, а следующая статья о том, как использовать другой режим WebFlux (Functional Endpoints) для разработки и необходимо добавить некоторые общие проблемы ~
Эта книга была включена в мои избранные статьи на GitHub, добро пожаловать, звезда:GitHub.com/Zhongf UC очень…
рад вывестигалантерейные товарыОбщедоступный номер технологии Java:Java3y. В публичном аккаунтеБолее 300 оригинальных статейТехнические статьи, обширные видеоресурсы, красивые карты мозга,Следуйте, чтобы получить его!
Творить не легко Ваша поддержка и признание - самая большая мотивация для моего творчества Увидимся в следующей статье!попросить лайк Пожалуйста, следите за ️ поделитесь пожалуйста 👥 Спросите сообщение 💬