Платформа публикации-подписки, параллельные пакеты для реактивных потоков
CompletableFuture
улучшения класса и т.д. .
JEP266 представляет множество новых способов параллелизма в языке Java: Reactive Streams, созданная для него более совместимая платформа публикации-подписки и улучшенная для других API Java 9.java.util.concurrent.CompletableFuture
классы и многие другие обновления.
В этой статье подробно ознакомьтесь с введением в реактивную потоковую передачу, а затем познакомьтесь с платформой публикации и подписки.
Реактивные потоки
Когда система пакетной обработки собирает достаточно данных и достигает определенного порога и необходимо перейти к следующему шагу, выводится новый термин — обработка данных (Data processing). В настоящее время идея потоково-ориентированной архитектуры может помочь нам достичь этой цели как можно быстрее. Он может захватывать и обрабатывать данные в режиме реального времени, а также может быстро (секунды или меньше) воздействовать на систему на основе результатов обработки. По сравнению с этим пакетной системе ответ может занять секунды, дни или даже больше.
Обработка потоков данных (особенно данных в реальном времени разного размера) требует особого внимания в асинхронных системах. Основная проблема заключается в контроле потребления ресурсов и недопущении перепроизводства (отставания) источников данных и систем обработки. В настоящее время необходимо выполнять параллельную обработку данных асинхронно, а использование распределенных систем или производительность многоядерных процессоров может эффективно сделать процесс обработки данных быстрым и эффективным.
Reactive Streams предоставляет стандарт для асинхронной обработки потоков с неблокирующим обратным давлением. Когда система обработки перегружена, источник данных уведомляется о необходимости выполнения соответствующей обработки путем асинхронной отправки сигналов. Этот сигнал уведомления подобен клапану на водопроводной трубе, закрытие которого увеличит противодавление (давление источника данных на систему обработки), что также увеличит давление на систему обработки.
Целью этого стандарта является управление обменом потоковыми данными через асинхронные границы (например, передача данных в другие потоки), при этом гарантируя, что система обработки не будет перегружена буферизованными данными. Другими словами, противодавление является неотъемлемой частью этой стандартной модели, позволяющей разграничивать очереди для посредничества между потоками. Обратите внимание, в частности, что связь противодавления является асинхронной.
Предложение Reactive Streams предназначено для предоставления минимального набора интерфейсов, методов или протоколов для описания этой операции или объекта: асинхронного потока данных с неблокирующим обратным давлением.
Платформа публикации-подписки
проход Java 9java.util.concurrent.Flow
а такжеjava.util.concurrent.SubmissionPublisher
класс для реализации реактивной потоковой передачи.
Flow
Класс определяет четыре вложенных статических интерфейса для создания компонентов управления потоком, в которых издатель генерирует один или несколько элементов данных для подписчиков:
- Издатель: издатель элемента данных, производитель
- Подписчик: подписчик элемента данных, потребитель
- Подписка: отношения между издателем и подписчиком, токен подписки.
- Процессор: процессор данных
Издатель (Publisher) публикует элементы данных в потоке, регистрирует подписчиков и реализуетFlow.Publisher
интерфейс, который объявляет метод, который мы вызываем для регистрации подписчиков для издателя:
void subscribe(Flow.Subscriber<? super T> subscriber)
Вызовите этот метод, чтобы зарегистрировать подписчика у издателя, однако, если подписчик уже зарегистрирован другим издателем или не может зарегистрироваться (конфликт политик), этот метод вызовет метод подписчика.onError()
способ броситьIllegalStateException
исключением, в том числе, абонентскогоonSubscribe()
метод вызовет новыйFlow.Subscription
, когда подписчику передается пустой объект,subscribe()
Метод генерирует исключение NullPointerException.
Подписчики возвращают элементы данных от подписанных издателей и реализуютFlow.Subscriber<T>
, методы, объявленные в этом интерфейсе, следующие:
void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
onSubscribe()
Метод используется для подтверждения успешной регистрации подписчика на издателе, получает список параметров в видеFlow.Subscription
Параметры типа и методы, объявленные в этом типе параметра, позволяют издателю запрашивать публикацию новых элементов данных или просить издателя не публиковать больше элементов данных.
onComplete()
метод используется, когда подписчик не вызывает другие методы, иSubscription
В случае ошибки и не прекращения. После вызова этого метода подписчик не может вызывать другие методы.
onError(Throwable throwable)
Этот метод используется, когда издатель или подписчик сталкивается с неисправимой ошибкой.После вызова этого метода подписчик не может вызывать другие методы.
onNext()
Метод используется для объявления подписки на следующий элемент данных.Если во время этого процесса возникнет исключение, результат не будет подтвержден или даже подписка будет отменена.
Токен подписки (Subscription) определяет отношения между издателем и подписчиком, так что подписчик получает определенный элемент данных или отменяет запрос на получение в определенное время, а токен подписки реализует самостоятельнуюFlow.Subscription
интерфейс, метод объявления интерфейса выглядит следующим образом:
void request(long n)
void cancel()
request()
Метод добавляет n элементов данных к запросу на подписку, который в настоящее время не заполнен. Если n меньше или равно 0, подписчикonError()
Будет вызван метод и будет выброшено исключение IllegalArgumentException.Кроме того, если n больше 0, подписчикonNext()
Вызов метода получает n элементов данных, если промежуточное исключение не завершается. Это неограниченный вызов от Long.MAX_VALUE до n раз.
cancel()
Используется для завершения приема элементов данных подписчиками, имеет механизм try, то есть после его вызова возможно получение элементов данных.
Наконец, процессор данных (Processor) выполняет обработку данных на основе потока, не меняя издателя и подписчика. Между издателем и подписчиком можно разместить несколько процессоров данных, чтобы образовать цепочку процессоров. Подписчики не привязаны к обработке данных, они являются отдельными процессами. JDK9 не предоставляет конкретных обработчиков данных и должен быть реализован разработчиками без объявления методов.Processor
Интерфейс строится сам.
SubmissionPublisher
реализовать себяFlow.Publisher
Интерфейс, который асинхронно отправляет ненулевые элементы данных текущему подписчику, пока он не будет закрыт. Каждый текущий подписчик получает вновь отправленные элементы данных в идентичном порядке, если только элементы данных не потеряны или не обнаружена исключительная ситуация.SubmissionPublisher
Позволяет элементам данных выступать в качестве издателей при потере или блокировке.
SubmissionPublisher
Для получения экземпляра предоставляются три конструктора. Конструктор без параметров зависит отForkJoinPool.commonPool()
метод отправки издателю, чтобы обеспечить асинхронную функцию производителя для предоставления элементов данных подписчику.
Следующая программа демонстрируетSubmissionPublisher
Использование и другие функции этой платформы публикации-подписки:
import java.util.Arrays;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo
{
public static void main(String[] args)
{
// Create a publisher.
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Create a subscriber and register it with the publisher.
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
// Publish several data items and then close the publisher.
System.out.println("Publishing data items...");
String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
"jul", "aug", "sep", "oct", "nov", "dec" };
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
try
{
synchronized("A")
{
"A".wait();
}
}
catch (InterruptedException ie)
{
}
}
}
class MySubscriber<T> implements Subscriber<T>
{
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
{
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item)
{
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
synchronized("A")
{
"A".notifyAll();
}
}
@Override
public void onComplete()
{
System.out.println("Done");
synchronized("A")
{
"A".notifyAll();
}
}
}
который использовалwait()
а такжеnotifyAll()
метод, чтобы заставить основной поток ждать, покаonComplete()
завершение, иначе вы не увидите никакого вывода.
Вот результат:
Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done
Наконец, студенты, знакомые с RxJava, могут понимающе улыбаться.