Учебное пособие по высокопроизводительной очереди Disruptor

Java

Что такое разрушитель

Разрывная очередь является высокопроизводительной очередью, разработанной британской валютной торговой компанией Lmax. Оригинальное намерение исследования и разработки состояло в том, чтобы решить проблему задержки очереди памяти (в тесте производительности, оно было обнаружено в том же порядке величины как операция ввода / вывода). Единая нить системы, разработанная на основе разрушителя, может поддерживать 6 миллионов заказов в секунду. После выявления речи в QCON в 2010 году она получила внимание отрасли. В 2011 году Enterprise Application Software Expert Martin Fowler написал длительное введение. В том же году он также выиграл официальный герцогский награду от Oracle. С точки зрения структуры данных, разрушитель - это поддержкаПроизводитель -> Потребительмодальныйкруговая очередь. способеннет замкаПараллельное потребление может осуществляться по условиям потребления, а порядок потребления также может осуществляться по зависимостям между потребителями. В этой статье будет показано, как некоторые классические сценарии реализуются через Disruptor.

добавить зависимости

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

Модель с одним производителем и одним потребителем

Сначала создайтеOrderEventкласс, этот класс будет помещен в циклическую очередь как содержимое сообщения.

@Data
public class OrderEvent {
    private String id;
}

СоздайтеOrderEventProducerкласс, который будет использоваться в качестве производителя.

public class OrderEventProducer {
    private final RingBuffer<OrderEvent> ringBuffer;
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(String orderId) {
        long sequence = ringBuffer.next();
        try {
            OrderEvent orderEvent = ringBuffer.get(sequence);
            orderEvent.setId(orderId);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

СоздайтеOrderEventHandlerкласс и реализуетEventHandler<T>иWorkHandler<T>интерфейс, как потребитель.

@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.info("event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch);
    }
    @Override
    public void onEvent(OrderEvent event) {
        log.info("event: {}", event);
    }
}

После создания трех вышеуказанных классов у нас уже есть事件类,生产者,消费者эти три элемента. Далее мы демонстрируем эту серию процессов с помощью основного метода.

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWith(new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        eventProducer.onData(UUID.randomUUID().toString());
    }
}

один производитель несколько потребителей

Если есть несколько потребителей, просто позвонитеhandleEventsWithПередайте несколько потребителей в метод. Код ниже передает двух потребителей.

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());

Два потребителя, переданные выше, будут использовать каждое сообщение повторно.Если вы хотите понять, что сообщение будет потребляться только одним потребителем, когда есть несколько потребителей, вам нужно вызватьhandleEventsWithWorkerPoolметод.

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

мультипроизводитель мультипотребитель

В реальной разработке это нормально, когда несколько производителей отправляют сообщения, а несколько потребителей обрабатывают сообщения. Это также поддерживается Disruptor. Код для нескольких производителей и нескольких потребителей выглядит следующим образом:

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,
                1024 * 1024,
                Executors.defaultThreadFactory(),
                // 这里的枚举修改为多生产者
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // 创建一个线程池,模拟多个生产者
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) {
            fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
        }
    }
}

потребительский приоритет

Срыв может делать вещи гораздо больше, чем содержимое выше. На фактической сцене мы обычно потому, что деловая логика формировать цепочку потребления. Например, необходимо сделать сообщение消费者A -> 消费者B -> 消费者Cпотребляются по порядку. При настройке потребителей вы можете передать.thenметод достижения. следующее:

disruptor.handleEventsWith(new OrderEventHandler())
         .then(new OrderEventHandler())
         .then(new OrderEventHandler());

Конечно,handleEventsWithиhandleEventsWithWorkerPoolвся поддержка.thenДа, их можно использовать в комбинации. Например, согласно消费者A -> (消费者B 消费者C) -> 消费者Dпорядок потребления

disruptor.handleEventsWith(new OrderEventHandler())
         .thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
         .then(new OrderEventHandler());

Суммировать

Вышеуказанное представляет собой общий метод нарушения высокопроизводительной очереди. фактически生成者 -> 消费者Шаблоны очень распространены, и вышеуказанные эффекты могут быть легко достигнуты с помощью некоторых очередей сообщений. Разница в том, что Disruptor реализован как очередь в памяти и не блокируется. Вот почему Disruptor эффективен.

Ссылка на ссылку