Использование Разрушителя
1. Введение
The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure.
(LMAX Disruptor — это высокопроизводительная библиотека обмена сообщениями между потоками. Она выросла из исследований LMAX в области параллелизма, производительности и неблокирующих алгоритмов и теперь является основной частью ее инфраструктуры Exchange.)
-- Цитата из введения GITHUB
Disruptor — это высокопроизводительный фреймворк асинхронной обработки, или его можно рассматривать как самый быстрый фреймворк сообщений (облегченный JMS), его также можно рассматривать как реализацию паттерна наблюдателя или реализацию паттерна прослушивания событий. Ниже приведен вводный вики-адрес:
https://github.com/LMAX-Exchange/disruptor/wiki
2. Конструктивная схема Disruptor
Disruptor решает проблему низкой скорости очереди следующим образом: Структура кольцевого массива Чтобы избежать сборки мусора, используйте массив вместо связанного списка. В то же время массив более дружелюбен к механизму кэширования процессора. позиционирование элемента Длина массива составляет 2^n, а скорость позиционирования увеличивается за счет побитовой операции. Индексы принимают форму увеличения. Не беспокойтесь о переполнении индекса. Индекс является длинным типом, и даже если используется скорость обработки в 1 миллион запросов в секунду, его использование займет 300 000 лет. беззамковая конструкция Каждый поток-производитель или потребитель будет сначала запрашивать позицию рабочего элемента в массиве, а после приложения напрямую записывать или читать данные в этой позиции. Давайте проигнорируем кольцевую структуру массива и представим, как реализовать схему без блокировок. Весь процесс обеспечивает потокобезопасность операции через атомарную переменную CAS.
3. Особенности реализации Disruptor
Еще одна ключевая деталь для достижения низкой задержки — использование алгоритмов без блокировки в Disruptor, где вся видимость и правильность памяти достигается с помощью барьеров памяти или операций CAS. Используя CAS для обеспечения безопасности многопоточности, CAS, очевидно, намного быстрее, чем блокировки, используемые большинством параллельных очередей. CAS — это инструкция уровня ЦП, которая является более легкой и не требует поддержки операционной системы, такой как блокировка, поэтому при каждом вызове не нужно переключаться между режимом пользователя и режимом ядра, а также не требуется переключение контекста. Существует только один вариант использования, когда требуется блокировка, и это BlockingWaitStrategy, и единственный способ реализовать ее — использовать Condition, чтобы позволить потребителям ждать до поступления нового события. Многие системы с малой задержкой используют ожидание занятости, чтобы избежать джиттера состояния, однако производительность может значительно снизиться во время операций ожидания занятости, особенно когда ресурсы ЦП сильно ограничены, например, веб-серверы в виртуализированных средах.
4. Disruptor реализует модель производитель-потребитель
Здесь мы создаем модель производителя и потребителя, которая помещает LongValue в соответствии с демонстрацией оригинального автора. Соответствующий код выглядит следующим образом:
maven-зависимости
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
LongEvent
//定义事件event 通过Disruptor 进行交换的数据类型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
LongEventHandler
// 消费者获得数据
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("消费者获得数据:" + longEvent.getValue());
}
}
LongEventProducer
// 生产者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 获取事件队列的下表位置
long sequence = ringBuffer.next();
try {
// 取出空队列
LongEvent longEvent = ringBuffer.get(sequence);
// 给空队列赋值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("生产者发送数据....");
ringBuffer.publish(sequence);
}
}
}
MainTest
public class MainTest {
public static void main(String[] args) {
// 1. 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 2. 创建工厂
LongEventFactory longEventFactory = new LongEventFactory();
// 3.创建ringbuffer 大小
int ringbuffer = 1024 * 1024; // 2的N次方
// 4. 创建disruptor
Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
longEventFactory, ringbuffer, executor,
ProducerType.MULTI, new YieldingWaitStrategy()
);
// 5. 连接消费者
longEventDisruptor.handleEventsWith(new LongEventHandler());
// 6. 启动
longEventDisruptor.start();
// 7.创建ringbuffer容器
RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
// 9. 指定缓冲区的大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 0; i < 10; i++) {
byteBuffer.putLong(0,i);
longEventProducer.onData(byteBuffer);
}
executor.shutdown();
longEventDisruptor.shutdown();
}
}
Результат выполнения следующий:
生产者发送数据....
生产者发送数据....
生产者发送数据....
生产者发送数据....
消费者获得数据:0
生产者发送数据....
消费者获得数据:1
生产者发送数据....
消费者获得数据:2
生产者发送数据....
消费者获得数据:3
生产者发送数据....
消费者获得数据:4
生产者发送数据....
消费者获得数据:5
生产者发送数据....
消费者获得数据:6
消费者获得数据:7
消费者获得数据:8
消费者获得数据:9