Первый опыт высокопроизводительной безблокировочной очереди Disruptor

задняя часть Netty Logstash Apache Storm

Оригинальный адрес:HaifeiWu и блог его друзей
адрес блога:www.hchstudio.cn
Добро пожаловать на перепечатку, пожалуйста, указывайте автора и источник, спасибо!

Недавно я исследовал некоторые проблемы с очередями, сегодня я поделюсь высокопроизводительным очередным Disruptor.

what Disruptor ?

Это высокопроизводительная очередь, разработанная британской валютной торговой компанией LMAX.Первоначальная цель исследований и разработок - решить проблему задержки очередей памяти. Один поток системы, разработанной на основе Disruptor, может поддерживать 6 миллионов заказов в секунду.

В настоящее время многие известные проекты, включая Apache Storm и Log4j2, используют Disruptor для достижения высокой производительности. Disruptor и Netty используются в компании-арендодателе для обработки данных GPS в реальном времени, и их производительность довольно высока. Эта статья дает общее представление о принципе реализации Disruptor с практической точки зрения.

why Disruptor ?

Disruptor решает проблему низкой скорости очереди следующим образом:

  • Структура кольцевого массива Чтобы избежать сборки мусора, используйте массив вместо связанного списка. Потому что массивы более дружественны к механизму кэширования процессора.
  • позиционирование элемента Длина массива составляет 2^n, а скорость позиционирования увеличивается за счет побитовой операции. Индексы принимают форму увеличения. Не беспокойтесь о переполнении индекса. Индекс является длинным типом, и даже если используется скорость обработки в 1 миллион запросов в секунду, его использование займет 300 000 лет.
  • беззамковая конструкция Каждый поток-производитель или потребитель будет сначала запрашивать позицию рабочего элемента в массиве, а после приложения напрямую записывать или читать данные в этой позиции.
  • Оптимизация для проблем с псевдо-обменом Disruptor устраняет эту проблему, по крайней мере, для архитектур процессоров с размером строки кэша 64 байта или меньше (возможно, что строка кэша процессора составляет 128 байт, поэтому использование 64-байтового заполнения все равно вызовет проблему ложного разделения), добавляя дополнения чтобы убедиться, что порядковый номер кольцевого буфера не существует в той же строке кэша, что и все остальное.

how Disruptor ?

Из приведенного выше введения мы, вероятно, можем понять, что Disruptor — это высокопроизводительная безблокировочная очередь, так как же ее использовать?

Во-первых, в соответствии с моделью программирования Disruptor, управляемой событиями, нам нужно определить событие для переноса данных.


public class DataEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}

Чтобы Disruptor предварительно назначал нам эти события, нам нужно построить EventFactory для выполнения построения


public class DataEventFactory implements EventFactory<DataEvent> {

    @Override
    public DataEvent newInstance() {
        return new DataEvent();
    }
}

Как только мы определили события, нам нужно создать потребителя, который обрабатывает эти события. В нашем случае все, что мы хотим сделать, это вывести значение из консоли.

public class DataEventHandler implements EventHandler<DataEvent> {
    @Override
    public void onEvent(DataEvent dataEvent, long l, boolean b) throws Exception {
        new DataEventConsumer(dataEvent);
    }
}

Далее нам нужно инициализировать Disruptor и определить производителя для генерации сообщений.


public class DisruptorManager {

    private final static Logger LOG = LoggerFactory.getLogger(DisruptorManager.class);

    /*消费者线程池*/
    private static ExecutorService threadPool;
    private static Disruptor<DataEvent> disruptor;
    private static RingBuffer<DataEvent> ringBuffer;

    private static AtomicLong dataNum = new AtomicLong();

    public static void init(EventHandler<DataEvent> eventHandler) {

        //初始化disruptor
        threadPool = Executors.newCachedThreadPool();
        disruptor = new Disruptor<>(new DataEventFactory(), 8 * 1024, threadPool, ProducerType.MULTI, new BlockingWaitStrategy());

        ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(eventHandler);
        disruptor.start();

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                LOG.info("放入队列中数据编号{},队列剩余空间{}", dataNum.get(), ringBuffer.remainingCapacity());
            }
        }, new Date(), 60 * 1000);
    }

    /**
     *
     * @param message
     */
    public static void putDataToQueue(long message) {
        if (dataNum.get() == Long.MAX_VALUE) {
            dataNum.set(0L);
        }

        // 往队列中加事件
        long next = ringBuffer.next();
        try {
            ringBuffer.get(next).set(message);
            dataNum.incrementAndGet();
        } catch (Exception e) {
            LOG.error("向RingBuffer存入数据[{}]出现异常=>{}", message, e.getStackTrace());
        } finally {
            ringBuffer.publish(next);
        }
    }

    public static void close() {
        threadPool.shutdown();
        disruptor.shutdown();
    }
}

Наконец, мы определяем метод Main для выполнения кода.


public class EventMain {

    public static void main(String[] args) throws Exception {
        DisruptorManager.init(new DataEventHandler());
        for (long l = 0; true; l++) {
            DisruptorManager.putDataToQueue(l);
            Thread.sleep(1000);
        }
    }
}

Если вы заинтересованы в приведенном выше коде, пожалуйста, переместитеGitHub.com/HaifeiW U/Первый…

Затем мы можем увидеть данные, напечатанные на консоли.

console

резюме

Disruptor обеспечивает высокую производительность в ситуациях с высокой степенью параллелизма благодаря сложной конструкции без блокировок.

Кроме того, асинхронный режим в Log4j 2 обрабатывается Disruptor. Здесь арендодатель сталкивается с небольшой проблемой, то есть при использовании Log4j 2 для отправки данных журнала в logstash через режим TCP связь прерывается из-за проблем с сетью, из-за чего Log4j 2 продолжает записывать данные в кольцевой буфер, а данные кольцевого буфера не имеет потребителей, что приводит к нехватке памяти сервера. Решение состоит в том, чтобы сделать очередь Disruptor в Log4j 2 ограниченной или переключиться в режим UDP для записи данных журнала (если данные не важны).

Ссылка на ссылку