Высокопроизводительный деструктор очередей без блокировки, о котором вы должны знать

Java задняя часть

1. Что такое очередь

Когда я слышу очередь, я верю, что она всем знакома. В нашей реальной жизни очереди можно увидеть повсюду. Когда вы пойдете в супермаркет, чтобы проверить, вы увидите, что все стоят в ряд, ожидая своей очереди. касса,зачем стоять в ряду?Вы представляете,что у всех нет качества,и идут на кассу в спешке,что не только приведет к обрушению супермаркета,но и легко вызовет разные давки-казусы.Конечно,эти вещи на самом деле часто случаются в нашей реальности.

Конечно, в компьютерном мире очередь — это структура данных, которая использует FIFO (first in first out), и новые элементы (элементы, ожидающие входа в очередь) всегда вставляются в хвост и считываются из начала. чтение. В вычислениях очереди обычно используются для создания очередей (например, очереди ожидания пула потоков, очереди ожидания блокировки), разделения (режим производитель-потребитель), асинхронности и т. д.

2. Очередь в jdk

Все очереди в jdk реализуют интерфейс java.util.Queue и делятся на две категории в очереди: одна небезопасна для потоков, ArrayDeque, LinkedList и т. д., а другая находится в пакете java.util.concurrent. является потокобезопасным, и в нашей реальной среде все наши машины многопоточные.Когда несколько потоков стоят в очереди в одной и той же очереди, если использование потоков небезопасно, это произойдет, перезапись данных, потеря данных и т. д. Непредсказуемые вещи , поэтому в настоящее время мы можем выбирать только потокобезопасные очереди. Некоторые из очередей кратко перечислены ниже потокобезопасных очередей, предоставляемых в jdk:

имя очереди Блокировать ли структура данных ключевые технические моменты Есть ли замок Это ограничено
ArrayBlockingQueue да массив массив ReentrantLock заблокирован есть мир
LinkedBlockingQueue да связанный список ReentrantLock заблокирован есть мир
LinkedTransferQueue нет связанный список CAS нет замка Неограниченный
ConcurrentLinkedQueue нет связанный список CAS нет замка Неограниченный

Мы видим, что наша незаблокированная очередь не ограничена, а заблокированная очередь ограничена. Здесь будет проблема. В реальной онлайн-среде неограниченная очередь оказывает большее влияние на нашу систему. Это может привести к тому, что наша память переполнение напрямую, поэтому мы должны сначала исключить неограниченную очередь.Конечно, это не значит, что неограниченная очередь бесполезна, но в некоторых сценариях ее необходимо исключить. Во-вторых, есть две очереди, ArrayBlockingQueue и LinkedBlockingQueue. Обе они потокобезопасны, контролируются ReentrantLock. Разница между ними заключается в массиве и связанном списке. Можно получить следующий элемент или получить несколько элементов очереди. за раз, и адрес массива непрерывен в памяти, и будет оптимизация кеша в операционной системе (строка кеша также будет введена ниже), поэтому скорость доступа будет немного лучше. мы также попробуем выбрать ArrayBlockingQueue. Оказывается, во многих сторонних фреймворках, таких как ранний асинхронный log4j, ArrayBlockingQueue — это выбор.

Конечно, у ArrayBlockingQueue есть и свои недостатки, то есть производительность относительно низкая. Зачем jdk добавляет какие-то безблокировочные очереди? На самом деле, это для увеличения производительности. Почему бы вам не отправиться в рай? Но там есть люди.

3.Disruptor

Disruptor — это день, упомянутый выше, Disruptor — это высокопроизводительная очередь, разработанная LMAX, британской компанией по торговле иностранной валютой, и представляет собой среду параллелизма с открытым исходным кодом, получившую в 2011 году награду Duke’s Program Framework Innovation Award. Он может реализовать параллельную работу Очереди сети без блокировок, а один поток системы, разработанной на основе Disruptor, может поддерживать 6 миллионов заказов в секунду. В настоящее время известные платформы, включая Apache Storm, Camel, Log4j2 и т. д., интегрировали Disruptor внутри, чтобы заменить очередь jdk для достижения высокой производительности.

3.1 Почему это так круто?

Вышеупомянутое уже разнесло Disruptor, у вас обязательно возникнут вопросы, неужели он действительно такой классный, мой ответ конечно же, в Disruptor есть три главных убийцы:

  • CAS
  • Устранение ложного обмена
  • Кольцевой буфер С этими тремя убийцами Disruptor стал таким могущественным.

3.1.1 Замки и CAS

Причина, по которой наша очередь ArrayBlockingQueue заброшена, заключается в том, что она является тяжеловесной блокировкой. Во время нашего процесса блокировки мы приостановим блокировку. После разблокировки мы возобновим поток. Этот процесс будет иметь определенные накладные расходы, и как только мы не получим блокировку , поток может только ждать и ничего не может сделать.

CAS (сравнить и поменять местами), как следует из названия, сначала сравнить и поменять местами. Как правило, это сравнение того, является ли это старым значением. Если да, выполните настройку обмена. Каждый, кто знаком с оптимистической блокировкой, знает, что CAS можно использовать для реализовать оптимистическую блокировку.В CAS нет потоков.Переключение контекста, уменьшающее ненужные накладные расходы. Здесь используется JMH с двумя потоками, по одному вызову каждый раз, для тестирования на моей локальной машине, код выглядит следующим образом:

@BenchmarkMode({Mode.SampleTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations=3, time = 5, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations=1,batchSize = 100000000)
@Threads(2)
@Fork(1)
@State(Scope.Benchmark)
public class Myclass {
    Lock lock = new ReentrantLock();
    long i = 0;
    AtomicLong atomicLong = new AtomicLong(0);
    @Benchmark
    public void measureLock() {
        lock.lock();
        i++;
        lock.unlock();
    }
    @Benchmark
    public void measureCAS() {
        atomicLong.incrementAndGet();
    }
    @Benchmark
    public void measureNoLock() {
        i++;
    }
}

Результаты теста следующие:

Тестовые задания Результаты теста
Lock 26000ms
CAS 4840ms
нет замка 197ms

Видно, что Lock — это пятизначное число, CAS — четырехзначное число, а no lock — меньшее трехзначное число. Отсюда мы можем узнать, что Lock>CAS>без блокировки.

А наш Disruptor использует CAS, который использует CAS для установки некоторых индексов в очереди, уменьшая конфликты блокировок и повышая производительность.

Кроме того, CAS также используется для других очередей без блокировок в jdk, а CAS также используется для атомарных классов.

3.1.2 Ложный обмен

Когда дело доходит до псевдосовместного использования, следует сказать, что кеш ЦП компьютера, размер кеша является одним из важных показателей ЦП, а структура и размер кеша имеют большое влияние на скорость ЦП. при той же частоте эффективность работы намного выше, чем у системной памяти и жесткого диска. В реальной работе ЦП часто приходится многократно считывать один и тот же блок данных, и увеличение емкости кэш-памяти может значительно улучшить скорость считывания данных внутри ЦП вместо поиска в памяти или на жестком диске, чтобы повысить производительность системы. . Но с учетом площади чипа процессора и стоимости кэш-памяти очень мало.

Кэш ЦП можно разделить на кеш L1 и кеш L2.В настоящее время основные процессоры имеют кеш L3, и даже некоторые процессоры имеют кеш L4. Все данные, хранящиеся в кэше каждого уровня, являются частью кэша следующего уровня.Техническая сложность и стоимость изготовления этих трех кэшей относительно уменьшаются, поэтому их емкость также относительно увеличивается.

Почему ЦП имеет такой дизайн кэш-памяти, как L1, L2 и L3? В основном потому, что текущие процессоры слишком быстрые, а чтение данных из памяти слишком медленное (одно из-за того, что сама память недостаточно быстрая, а другое из-за того, что она слишком далеко от ЦП, что обычно требует, чтобы ЦП ждал несколько ч. Десять и даже сотни тактов), в это время для обеспечения быстродействия ЦП необходимо оказывать помощь памяти с меньшей задержкой и большей скоростью, а это кэш. Если вас это интересует, вы можете удалить процессор компьютера и поиграть с ним самостоятельно.

Каждый раз, когда вы слышите о выпуске новых процессоров Intel, таких как i7-7700k, 8700k, он оптимизирует размер кеша процессора.Если вам интересно, вы можете самостоятельно поискать эти конференции или статьи.

Некоторое время для кэша указано в докладе Мартина и Майка QConpresentation:

от процессора к Приблизительные требуемые циклы ЦП Примерное время, необходимое
основная память Около 60-80 наносекунд
Передача по шине QPI (между сокетами, не нарисовано) около 20 нс
L3 cache Около 40-45 циклов около 15 нс
L2 cache около 10 циклов около 3 нс
L1 cache Около 3-4 циклов около 1 нс
регистр 1 cycle

строка кэша

В многоуровневом кэше процессора он хранится не как самостоятельный элемент, а по стратегии, аналогичной pageCahe, которая хранится как строка кэша, а размер строки кэша обычно составляет 64 байта. Это 8 байт, поэтому он может хранить лонги 8. Например, когда вы обращаетесь к длинной переменной, она загрузит еще помощь 7. Выше мы сказали, почему массив не выбран для связанного списка, по этой причине массивы могут полагаться на буферизованные строки для быстрого доступа.

Кэш-строки — панацея? НЕТ, потому что это все еще приносит недостаток. Я приведу здесь пример, чтобы проиллюстрировать этот недостаток. Вполне возможно, что существует очередь-массив ArrayQueue, структура данных которой выглядит следующим образом:

class ArrayQueue{
    long maxSize;
    long currentIndex;
}

Для maxSize мы определили размер массива с самого начала. Для currentIndex он отмечает положение нашей текущей очереди. Это изменение происходит относительно быстро. Вполне возможно, что при доступе к maxSize вы также загружаете currentIndex? другие потоки обновляют currentIndex, они сделают недействительной строку кеша в ЦП. Обратите внимание, что это предусмотрено ЦП. Он не только делает недействительным параметр currentIndex. Если он продолжит доступ к maxSize в это время, ему все равно придется продолжить с Чтение, но MaxSize — это то, что мы определили в начале, мы должны получить доступ к кешу, но на него влияет currentIndex, который мы часто меняем.

Магия Паддинга

Чтобы решить проблему с вышеуказанной строкой кэша, в Disruptor принят метод заполнения.

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

Значение в нем заполнено некоторыми другими бесполезными длинными переменными. Таким образом, когда вы изменяете значение, это не повлияет на строки кэша других переменных.

Наконец, кстати, аннотация @Contended предусмотрена в jdk8.Конечно, она вообще разрешена только внутри Jdk.Если вы используете ее самостоятельно, то вам нужно настроить параметр Jvm -RestricContentended=fase, который будет ограничивать эту аннотацию . Многие статьи анализируют ConcurrentHashMap, но игнорируют эту аннотацию.Эта аннотация используется в ConcurrentHashMap.В ConcurrentHashMap каждая корзина рассчитывается отдельным счетчиком, и этот счетчик все время меняется, поэтому заполнение строк Cache оптимизируется с помощью этой аннотации для повышения производительности .

3.1.3RingBuffer

В Disruptor массив используется для сохранения наших данных.Мы также ввели использование массива для сохранения кеша при посещении, но в Disruptor мы дополнительно выбираем использование кольцевого массива для сохранения данных, то есть, Кольцевой буфер. Позвольте мне сначала объяснить, что кольцевой массив не является настоящим кольцевым массивом.В RingBuffer доступ к нему осуществляется путем взятия остатка.Например, размер массива равен 10, а 0 обращается к позиции, где нижний индекс массива равен 0 На самом деле 10, 20 и т. д. Доступ также является позицией массива, индекс которого равен 0.

На самом деле в этих фреймворках остаток использует не операцию %, а операцию & и, которая требует, чтобы вы установили размер в N-й степени 2, то есть 10, 100, 1000 и т. д., поэтому вычтите 1 Если это 1, 11, 111, вы можете очень хорошо использовать индекс & (размер -1), чтобы использование битовых операций увеличило скорость доступа. Если вы не установите размер в степени 2 в Disruptor, он выдаст исключение, что размер буфера должен быть степенью 2.

Конечно, это не только решает проблему быстрого доступа к массиву, но и решает проблему отсутствия необходимости снова выделять память, уменьшая сборку мусора, ведь у нас 0, 10, 20 и т. д. все выполняются в одной памяти. области, поэтому нет необходимости повторно выделять память, которая часто освобождается сборщиком мусора JVM.

С тех пор три основных убийцы были закончены, и эти три убийцы заложили основу для высокой производительности Disruptor. Далее я объясню, как использовать Disruptor и конкретный принцип работы Disruptor.

3.2 Как использовать Разрушитель

Вот простой пример:

ublic static void main(String[] args) throws Exception {
        // 队列中的元素
        class Element {
            @Contended
            private String value;


            public String getValue() {
                return value;
            }

            public void setValue(String value) {
                this.value = value;
            }
        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            int i = 0;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread" + String.valueOf(i++));
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
                System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
//                Thread.sleep(10000000);
            }
        };


        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 8;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((element, sequence) -> {
                System.out.println("之前的数据" + element.getValue() + "当前的sequence" + sequence);
                element.setValue("我是第" + sequence + "个");
            });

        }
    }

В Disruptor есть несколько ключевых: ThreadFactory: это фабрика потоков для потоков, необходимых производителю в нашем Disruptor для потребления. EventFactory: фабрика событий, фабрика, используемая для создания элементов нашей очереди в Disruptor, она будет заполнять RingBuffer непосредственно при инициализации и будет действовать в одно и то же время. EventHandler: обработчик, используемый для обработки событий, где EventHandler может рассматриваться как потребитель, но несколько EventHandler являются очередями для независимого потребления. WorkHandler: это также обработчик для обработки событий.Отличие от вышеизложенного заключается в том, что несколько потребителей используют одну и ту же очередь. WaitStrategy: стратегия ожидания.В Disruptor есть несколько стратегий, чтобы решить, какую стратегию выбрать, если нет данных, когда потребители потребляются? Ниже приведен краткий список некоторых стратегий в Disruptor.

  • BlockingWaitStrategy: блокируя поток, ожидая пробуждения производителя.После пробуждения он циклически проверяет, была ли использована зависимая последовательность.

  • BusySpinWaitStrategy: поток продолжает вращаться и ждать, что может потреблять больше ресурсов ЦП.

  • LiteBlockingWaitStrategy: поток блокируется в ожидании пробуждения производителя. По сравнению с BlockingWaitStrategy разница заключается в signalNeeded.getAndSet. Если два потока одновременно обращаются к одному ожиданию доступа и одному сигналу доступа, количество блокировок может быть уменьшено.

  • LiteTimeoutBlockingWaitStrategy: по сравнению с LiteBlockingWaitStrategy устанавливается время блокировки, и по истечении времени создается исключение.

  • YieldingWaitStrategy: попробуйте 100 раз, затем Thread.yield() даст процессор

EventTranslator: реализация этого интерфейса может преобразовывать другие наши структуры данных в события, циркулирующие в Disruptor.

3.3 Принцип работы

Выше были представлены три главных убийцы CAS, сокращение ложного обмена и RingBuffer.Давайте представим весь процесс производителей и потребителей в Disruptor.

3.3.1 Производитель

Для производителей его можно разделить на несколько производителей и одного производителя, используя ProducerType.Single и ProducerType.MULTI, чтобы различать, несколько производителей и один производитель имеют больше CAS, потому что один производитель — это один поток, поэтому нет необходимости гарантировать потокобезопасность.

Disruptor.publishEvent и Disruptor.publishEvents() обычно используются для одиночной и групповой отправки в дисраптор.

Для публикации события в очереди в дисрапторе необходимо выполнить следующие шаги:

  1. Во-первых, получите следующую позицию в RingBuffer, которую можно опубликовать в RingBuffer.Это можно разделить на две категории:
  • местоположение никогда не написано
  • Был прочитан всеми потребителями и может быть в состоянии записи. Если он не будет прочитан, он продолжит попытки чтения.Дизраптор делает это очень умно.Он не занимает ЦП все время. Вместо этого он блокирует и приостанавливает поток через LockSuport.park(), чтобы предотвратить ЦП от продолжения Это вид пустого цикла, иначе другие потоки не смогут захватить квант времени ЦП.
    После получения локации cas будет выгружен, если это одиночный поток, то не требуется.
  1. Затем вызовите EventTranslator, который мы представили выше, чтобы передать событие в этой позиции в RingBuffer на первом этапе в EventTranslator для перезаписи.
  2. Для публикации в дисрапторе есть дополнительный массив для записи последнего серийного номера текущего местоположения кольцевого буфера.В качестве примера возьмем приведенные выше 0, 10 и 20. Когда 10 записано, avliableBuffer будет записан в соответствующей позиции. , В настоящее время этот принадлежит 10. В чем польза?Я представлю его позже. При публикации нужно обновить этот avliableBuffer, а потом разбудить всех заблокированных производителей.

Кратко нарисуем процесс.Неправильно брать 10 в качестве примера выше, потому что bufferSize должен быть равен 2 в N-й степени, поэтому мы возьмем Buffersize=8 в качестве примера: следующее описывает, когда мы отправили 8 событий, что кружок, затем какие-то процессы отправки 3-х сообщений: 1. Сначала вызовите next(3), сейчас мы находимся в позиции 7, поэтому следующие три элемента — это 8, 9, 10, а остаток — 0, 1, 2. 2. Перепишите данные трех областей памяти 0, 1 и 2. 3. Запишите доступный буфер.

Кстати, я не знаю, знакомы ли вы с описанным выше процессом, верно, он похож на наш 2PC, двухэтапная отправка, сначала блокировка положения RingBuffer, а затем отправка и уведомление потребителей. Для введения конкретных 2PC, пожалуйста, обратитесь к моей другой статьеЕсли кто-то снова спросит вас о распределенных транзакциях, покажите ему эту статью.

3.3.1 Потребители

Для потребителей вышеупомянутое разделено на два типа: один состоит в том, что несколько потребителей потребляют независимо друг от друга, а другой заключается в том, что несколько потребителей используют одну и ту же очередь.Здесь мы представляем более сложные несколько потребителей, использующих одну и ту же очередь.Если вы понимаете это, вы также можете понять самостоятельное потребление. В нашем методеruptor.strat() поток-потребитель запускается для фонового потребления. В потребителях есть две очереди, которые требуют нашего внимания: одна — очередь выполнения, совместно используемая всеми потребителями, а другая — очередь выполнения, которую каждый потребитель потребляет независимо. 1. Прервите CAS следующего Next в общей очереди потребителя и отметьте текущий прогресс в очереди своего собственного хода потребления. 2. Подайте заявку на следующую позицию читаемого RingBuffer для себя.Здесь приложение применяется не только к следующему, но может применяться к диапазону, большему, чем следующий.Процесс применения стратегии блокировки выглядит следующим образом:

  • Получить последнюю позицию записи производителя в RingBuffer
  • Определите, меньше ли она, чем должность, на которую я хочу подать заявку
  • Если оно больше, это доказывает, что это место было записано, и возвращается производителю.
  • Если в эту позицию не было записано меньшее доказательство, оно будет заблокировано в стратегии блокировки, которая проснется на этапе фиксации производителя. 3. Проверьте читабельность этой позиции, т. к. позиция, на которую вы претендуете, может быть непрерывной. Например, производитель в данный момент находится на 7, а затем подает заявку на чтение. Если потребитель написал позицию порядкового номера 8 и 10 , Однако не успела записаться позиция 9. Так как первый шаг вернет 10, но 9 на самом деле не может быть прочитан, поэтому позиция должна быть сжата до 8.
    4. Если он меньше, чем текущий после сжатия, продолжайте применять в цикле. 5. Передать обработчику handler.onEvent() для обработки

Таким же образом, возьмем пример, мы хотим претендовать на позицию next=8. 1. Сначала выгрузить прогресс 8 в общую очередь и записать прогресс 7 в независимую очередь. 2. Получаем максимально читаемую позицию 8, что осуществляется по разным стратегиям.Выбираем блокировку.Так как производитель производит 8,9 и 10, то возвращается 10, так что не нужно лишний раз сравнивать с availableBuffer в будущем. . 3. Наконец передайте его обработчику для обработки.

4. Разрушитель в Log4j

На следующем рисунке показано сравнение пропускной способности Log4j с использованием Disruptor, ArrayBlockingQueue и синхронной пропускной способности Log4j. Вы можете видеть, что использование Disruptor взорвало другие. Конечно, есть и другие фреймворки, использующие Disruptor, которые здесь не будут представлены.

Наконец

В этой статье рассказывается о недостатках традиционных блокирующих очередей, а в следующей статье рассказывается о Disruptor, о том, почему он такой классный, и о конкретном рабочем процессе.

Если кто-то попросит вас спроектировать эффективную очередь без блокировок в будущем, как вы ее спроектируете? Я считаю, что вы можете обобщить ответ из статьи.Если у вас есть какие-либо вопросы по этому поводу или вы хотите обменяться идеями со мной, вы можете подписаться на мой официальный аккаунт и добавить моих друзей, чтобы обсудить со мной.

Наконец, эта статья была включена в JGrowing, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом.Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе.Адрес github:GitHub.com/Java растет…Пожалуйста, дайте мне маленькую звезду.

Если вы считаете, что эта статья полезна для вас, или если у вас есть какие-либо вопросы и вы хотите предоставить бесплатный VIP-сервис 1 на 1, вы можете подписаться на мой официальный аккаунт Ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O :