Что такое разрушитель
Разрывная очередь является высокопроизводительной очередью, разработанной британской валютной торговой компанией Lmax. Оригинальное намерение исследования и разработки состояло в том, чтобы решить проблему задержки очереди памяти (в тесте производительности, оно было обнаружено в том же порядке величины как операция ввода / вывода). Единая нить системы, разработанная на основе разрушителя, может поддерживать 6 миллионов заказов в секунду. После выявления речи в QCON в 2010 году она получила внимание отрасли. В 2011 году Enterprise Application Software Expert Martin Fowler написал длительное введение. В том же году он также выиграл официальный герцогский награду от Oracle. С точки зрения структуры данных, разрушитель - это поддержкаПроизводитель -> Потребительмодальныйкруговая очередь. способеннет замкаПараллельное потребление может осуществляться по условиям потребления, а порядок потребления также может осуществляться по зависимостям между потребителями. В этой статье будет показано, как некоторые классические сценарии реализуются через Disruptor.
добавить зависимости
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
Модель с одним производителем и одним потребителем
Сначала создайтеOrderEvent
класс, этот класс будет помещен в циклическую очередь как содержимое сообщения.
@Data
public class OrderEvent {
private String id;
}
СоздайтеOrderEventProducer
класс, который будет использоваться в качестве производителя.
public class OrderEventProducer {
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String orderId) {
long sequence = ringBuffer.next();
try {
OrderEvent orderEvent = ringBuffer.get(sequence);
orderEvent.setId(orderId);
} finally {
ringBuffer.publish(sequence);
}
}
}
СоздайтеOrderEventHandler
класс и реализуетEventHandler<T>
иWorkHandler<T>
интерфейс, как потребитель.
@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch);
}
@Override
public void onEvent(OrderEvent event) {
log.info("event: {}", event);
}
}
После создания трех вышеуказанных классов у нас уже есть事件类
,生产者
,消费者
эти три элемента. Далее мы демонстрируем эту серию процессов с помощью основного метода.
@Slf4j
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE,
new YieldingWaitStrategy()
);
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
eventProducer.onData(UUID.randomUUID().toString());
}
}
один производитель несколько потребителей
Если есть несколько потребителей, просто позвонитеhandleEventsWith
Передайте несколько потребителей в метод. Код ниже передает двух потребителей.
- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
Два потребителя, переданные выше, будут использовать каждое сообщение повторно.Если вы хотите понять, что сообщение будет потребляться только одним потребителем, когда есть несколько потребителей, вам нужно вызватьhandleEventsWithWorkerPool
метод.
- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
мультипроизводитель мультипотребитель
В реальной разработке это нормально, когда несколько производителей отправляют сообщения, а несколько потребителей обрабатывают сообщения. Это также поддерживается Disruptor. Код для нескольких производителей и нескольких потребителей выглядит следующим образом:
@Slf4j
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
1024 * 1024,
Executors.defaultThreadFactory(),
// 这里的枚举修改为多生产者
ProducerType.MULTI,
new YieldingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 创建一个线程池,模拟多个生产者
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
}
}
}
потребительский приоритет
Срыв может делать вещи гораздо больше, чем содержимое выше. На фактической сцене мы обычно потому, что деловая логика формировать цепочку потребления. Например, необходимо сделать сообщение消费者A -> 消费者B -> 消费者C
потребляются по порядку. При настройке потребителей вы можете передать.then
метод достижения. следующее:
disruptor.handleEventsWith(new OrderEventHandler())
.then(new OrderEventHandler())
.then(new OrderEventHandler());
Конечно,handleEventsWith
иhandleEventsWithWorkerPool
вся поддержка.then
Да, их можно использовать в комбинации. Например, согласно消费者A -> (消费者B 消费者C) -> 消费者D
порядок потребления
disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
.then(new OrderEventHandler());
Суммировать
Вышеуказанное представляет собой общий метод нарушения высокопроизводительной очереди. фактически生成者 -> 消费者
Шаблоны очень распространены, и вышеуказанные эффекты могут быть легко достигнуты с помощью некоторых очередей сообщений. Разница в том, что Disruptor реализован как очередь в памяти и не блокируется. Вот почему Disruptor эффективен.