1. Что такое разрушитель
Мартин Фаулер написал статью об архитектуре LMAX на своем веб-сайте, в которой он представил LMAX как новый тип розничной финансовой торговой платформы, которая может генерировать большое количество транзакций с очень низкой задержкой. Система построена на платформе JVM, а ее ядром является процессор бизнес-логики, способный обрабатывать 6 миллионов заказов в секунду в одном потоке. Процессор бизнес-логики полностью работает в памяти и использует подход, основанный на источнике событий. Ядром процессора бизнес-логики является Disruptor.
Disruptor — это среда параллелизма с открытым исходным кодом, получившая в 2011 году награду Duke’s Program Framework Innovation Award за возможность реализации параллельной работы сетевой очереди без блокировок.
Disruptor — это высокопроизводительный фреймворк асинхронной обработки, или его можно рассматривать как самый быстрый фреймворк сообщений (облегченный JMS), его также можно рассматривать как реализацию паттерна наблюдателя или реализацию паттерна прослушивания событий.
Прежде чем использовать, сначала объясните основные функции разрушителя, вы сможете понять, что это эффективная модель «производитель-потребитель». То есть производительность намного выше, чем у традиционного контейнера BlockingQueue.
В статье JDK о многопоточности и параллельной библиотеке упоминается, что BlockingQueue реализует модель производитель-потребитель BlockingQueue реализуется на основе блокировок, а эффективность блокировок обычно низкая.
Disruptor использует режим наблюдателя для активной отправки сообщений потребителям вместо того, чтобы ждать, пока потребители заберут их из очереди; в случае lock-free реализована параллельная работа очереди (кольцо, RingBuffer), и производительность намного выше чем BlockingQueue
2. Конструктивная схема Disruptor
Disruptor решает проблему низкой скорости очереди следующим образом:
- Структура кольцевого массива:
Чтобы избежать сборки мусора, используйте массив вместо связанного списка. В то же время массив более дружелюбен к механизму кэширования процессора. - Позиционирование элемента:
Длина массива составляет 2^n, а скорость позиционирования увеличивается за счет побитовой операции. Индексы принимают форму увеличения. Не беспокойтесь о переполнении индекса. Индекс является длинным типом, и даже если используется скорость обработки в 1 миллион запросов в секунду, его использование займет 300 000 лет. - Беззамковый дизайн:
Каждый поток-производитель или потребитель будет сначала запрашивать позицию рабочего элемента в массиве, а после приложения напрямую записывать или читать данные в этой позиции.
3. Разрушитель осуществляет производство и потребление
1. Информация о зависимости Pom Maven
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
2. Сначала объявите событие, содержащее данные, которые необходимо передать:
//定义事件event 通过Disruptor 进行交换的数据类型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
3. Нам нужно позволить разрушению создавать для нас события, и мы также заявляем, что EventFactory для создания объекта события.
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
4. Потребитель событий, который является обработчиком событий. Этот обработчик событий просто выводит данные, хранящиеся в событии, на терминал:
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者:"+event.getValue());
}
}
5. Определите производителя для отправки события
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件队列 下一个槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件队列
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3.获取事件队列传递的数据
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
System.out.println("生产这准备发送数据");
//4.发布事件
ringBuffer.publish(sequence);
}
}
}
6. Основная функция выполняет вызов
public class DisruptorMain {
public static void main(String[] args) {
// 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建工厂
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.创建ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
// 4.创建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.连接消费端方法
disruptor.handleEventsWith(new LongEventHandler());
// 6.启动
disruptor.start();
// 7.创建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.关闭disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
}
В-четвертых, что такое кольцевой буфер
Это кольцо (сквозное кольцо), которое вы можете использовать в качестве буфера для передачи данных между различными контекстами (потоками).
По сути, кольцевой буфер имеет порядковый номер, указывающий на следующий доступный элемент в массиве. (Примечание для проверки: рисунок справа от следующего рисунка представляет порядковый номер, который указывает на позицию индекса 4 массива.)
По мере того, как вы продолжаете заполнять буфер (и, возможно, соответствующие чтения), порядковый номер будет увеличиваться до тех пор, пока цикл не будет обойден.
Чтобы найти элемент, на который указывает текущий порядковый номер в массиве, вы можете использовать операцию mod: возьмите в качестве примера вышеприведенный кольцевой буфер (синтаксис mod java): 12 % 10 = 2. Просто как тот. На самом деле тот факт, что кольцевой буфер на картинке выше имеет всего 10 слотов, является полной неожиданностью. Если количество слотов равно 2 в N-й степени, выгоднее использовать двоичный код.
преимущество
Причина, по которой кольцевой буфер использует эту структуру данных, заключается в том, что она обеспечивает хорошую производительность при надежной доставке сообщений. Этого достаточно, но у него есть и другие преимущества.
Во-первых, поскольку это массив, он работает быстрее, чем связанный список, и имеет легко предсказуемый шаблон доступа. (Примечание переводчика: адреса памяти элементов в массиве хранятся непрерывно). Это дружественно к кэшу ЦП, то есть на аппаратном уровне элементы в массиве предварительно загружены, поэтому в кольцевом буфере ЦП не нужно время от времени обращаться к основной памяти для загрузки следующего элемента в массиве. (Примечание для проверки: пока один элемент загружается в строку кэша, несколько других соседних элементов также будут загружены в ту же строку кэша)
Во-вторых, вы можете предварительно выделить память для массива, чтобы объект массива сохранялся (если программа не завершится). Это означает, что вам не нужно тратить много времени на сборку мусора. Также, в отличие от связанного списка, для каждого добавленного в него объекта необходимо создавать объект-узел — соответственно, при удалении узла необходимо выполнять соответствующую операцию очистки памяти.
Базовая реализация RingBuffer
RingBuffer представляет собой сквозной кольцевой массив.Так называемое сквозное соединение означает, что когда указатель в RingBuffer пересекает верхнюю границу массива, он продолжает движение с начала массива. Следовательно, в RingBuffer есть по крайней мере один указатель для представления позиции операции в RingBuffer. Кроме того, операция автоинкремента указателя требует контроля параллелизма.И Disruptor, и OptimizedQueue в этой статье используют оптимистичный контроль параллелизма CAS для обеспечения атомарности автоинкремента указателя.
В Disruptor есть только один указатель на RingBuffer, который указывает, куда записывается сообщение в текущем RingBuffer. Кроме того, каждый потребитель будет поддерживать последовательность, указывающую, где они читают RingBuffer. С этой точки зрения, RingBuffer в Disruptor реально есть потребление Количество пользователей + 1 указатель. Поскольку то, что мы хотим реализовать, представляет собой очередь блокировки с одним сообщением и одним потреблением, нам нужно только поддерживать указатель чтения (соответствующий потребителю) и указатель записи (соответствующий производителю). чтение и запись с начала массива
Пять, основная концепция Disruptor
RingBuffer
Как следует из названия, кольцевой буфер. RingBuffer раньше был основным объектом в Disruptor, но начиная с версии 3.0 его обязанности были сведены к простому хранению и обновлению данных (событий), которыми обмениваются через Disruptor. В некоторых более сложных сценариях применения кольцевой буфер может быть полностью заменен определяемой пользователем реализацией.
SequenceDisruptor
Обмениваемые через них данные (события) нумеруются и управляются последовательно увеличивающимися порядковыми номерами, а обработка данных (событий) всегда выполняется по одному по порядковым номерам. Sequence используется для отслеживания хода определения конкретного обработчика событий ( RingBuffer/Consumer ). Хотя AtomicLong также можно использовать для определения прогресса, определение последовательности, которая позаботится об этом, имеет другую цель, а именно предотвращение проблемы ложного совместного использования кэша ЦП (Flase Sharing) между различными последовательностями.
Sequencer
Sequencer — настоящее сердце Disruptor. Этот интерфейс имеет два класса реализации SingleProducerSequencer и MultiProducerSequencer, которые определяют параллельные алгоритмы для быстрой и корректной передачи данных между производителями и потребителями.
Sequence Barrier
Ссылка на Sequence другого зависимого от потребителя другого зависимого от потребителя другого потребителя зависит от RingBuffer. Sequence Barrier также определяет логику, определяющую, можно ли по-прежнему обрабатывать consUmer.
Wait Strategy
Определите, как потребитель представляет собой политику, ожидающую следующего события. (Примечание: Disruptor определяет множество различных стратегий, предоставляя разные сцены, обеспечивая разную производительность)
Event
В семантике Disruptor обмен данными между производителем и потребителем называется событием. Это не конкретный тип, определенный Disruptor, но он определяется и указывается пользователем Disruptor.
EventProcessor
EventProcessor содержит Sequence определенных потребителей (Consumer) и предоставляет цикл обработки событий (Event Loop) для вызова реализаций обработки событий.
EventHandler
Интерфейс обработки событий, определенный Disruptor, реализуется пользователем для обработки событий и является реальной реализацией Consumer.
Producer
То есть производитель, который обычно относится к пользовательскому коду, который вызывает Disruptor для публикации события, а Disruptor не определяет конкретный интерфейс или тип.
Роль каждой концепции
- RingBuffer — базовая реализация структуры данных Disruptor, базовый класс, является местом передачи для обмена данными между потоками;
- Sequencer - менеджер порядковых номеров, отвечающий за управление и координацию соответствующих порядковых номеров и ограждений порядковых номеров потребителей/производителей;
- Sequence — порядковый номер, объявить порядковый номер для отслеживания изменений задач в кольцевом буфере и потребления потребителей;
- SequenceBarrier - ограждение серийного номера, управляет и координирует серийный номер курсора производителя и серийный номер каждого потребителя, чтобы гарантировать, что производитель не перезапишет сообщения, которые потребитель может обработать в будущем, и гарантировать, что зависимые потребители могут быть обрабатывается в правильном порядке;
- EventProcessor - обработчик событий, который прослушивает события RingBuffer и потребляет доступные события. События, прочитанные из RingBuffer, будут переданы классу реализации фактического производителя для потребления; он всегда будет прослушивать следующий доступный порядковый номер, пока порядковый номер не будет соответствовать Событию готово.
- EventHandler — бизнес-процессор, представляет собой интерфейс фактического потребителя, завершает конкретную реализацию бизнес-логики, а третья сторона реализует интерфейс, представляет потребителя.
- Producter — интерфейс Producer, в роли этой роли выступает сторонний поток, а Producter записывает событие в RingBuffer.
личный блогулитки