Disruptor является открытым исходным кодом и эффективнымпроизводитель-потребительFramework, трудно напрямую объяснить, что делает этот фреймворк, но этот фреймворк можно понимать какBlockingQueue
. Неужели так проще понять, это очередь производитель-потребитель, но производительность у нее лучше, чем уBloockingQueue
Гораздо лучше, утверждается, что одна машина может иметь миллионы TPS.
Особенности Разрушителя
Disruptor обладает следующими тремя основными характеристиками:
- многоадресная рассылка событий
- Заранее выделяйте память для событий
- работа без блокировки
Как правило, когда мы используем очередь, сообщения в очереди будут обрабатываться только одним消费者
Используйте, но в Disruptor одно и то же сообщение может обрабатываться несколькими потребителями одновременно, и между несколькими потребителями существует параллель.
Поскольку это может быть одновременно, часть данных должна использоваться в нескольких местах, например, данные запроса должны использоваться одновременно.регистрируется,Резервное копирование на удаленную машину,проводить деловую обработку,Как показано ниже:
Если сообщение может быть обработано только одним потребителем, то три упомянутые выше логики обработки должны выполняться линейно или размещаться в одном потребителе и обрабатываться асинхронно. Если эти операции можно разделить на несколько потребителейпараллельноПотребляйте одно и то же сообщение, эффективность обработки будет намного выше.
Таким образом, необходимо гарантировать, что сообщение может быть обработано всеми потребителями до того, как будет обработано следующее, и потребители не могут обрабатывать разные сообщения одновременно, поэтому необходим инструмент, аналогичный CyclicBarrier в Java, чтобы гарантировать, что все потребители могут обрабатывать следующее сообщение одновременно, Disruptor реализует SequenceBarrier для выполнения этой функции.
Disruptor предназначен для сред с низкой задержкой. В системе с низкой задержкой выделение памяти нужно уменьшить или вообще не выделять, а в Java — уменьшить время паузы, вызванной сборкой мусора. В Disruptor для достижения этой цели используется RingBuffer.Объекты создаются в RingBuffer заранее, а последующее использование этих объектов повторяется, чтобы избежать сборки мусора.Эта реализация потокобезопасна.
В Disruptor безопасность потоков в основном не использует блокировки, а использует механизмы без блокировок, такие как CAS, для обеспечения безопасности потоков.
Основная идея
Прежде чем формально понять Disruptor, нам нужно понять некоторые основные концепции.Код Disruptor не сложен, и в основном он разработан вокруг этих основных концепций.
- Производитель: производитель данных, сам производитель не имеет ничего общего с Disruptor, это может быть любой код, который производит данные, даже цикл for
- Событие: данные, созданные Producer, пользователи могут определять самостоятельно в соответствии со своими потребностями.
- RingBuffer: структура данных, используемая для хранения событий, предварительного выделения памяти и предотвращения создания объектов во время выполнения программы.
- EventHandler: Потребитель, реализованный пользователем
- Последовательность: используется для идентификации компонентов в Disruptor, и сотрудничество между несколькими компонентами зависит от этого для достижения
- Sequencer: основной механизм в Disruptor, который реализует основной алгоритм параллелизма для обеспечения правильной доставки сообщений между производителями и потребителями.
- SequenceBarrier: используется для обеспечения одновременной обработки новых сообщений всеми потребителями.
- WaitStrategy: стратегия потребительского ожидания
- EventProcessor: конкретная реализация, доставляющая сообщения потребителям.
Вышеуказанные компоненты составляют Disruptor.Количество кода во всем фреймворке на самом деле очень маленькое, должно быть меньше 7000 строк, и код очень чистый. Код в основном не использует наследование, а используетинтерфейсно-ориентированное программированиетак же каккомбинация, поэтому связь между кодами очень низкая.
RingBuffer и Sequencer являются двумя наиболее важными компонентами: первый используется для хранения сообщений, а второй контролирует упорядоченное производство и потребление сообщений.
стратегия повышения производительности
Основная цель Disruptor — улучшить пропускную способность сообщений, поэтому инфраструктура также реализуется вокруг этих целей, в основном выполняя следующие действия:
- Уменьшить сбор мусора
- Позволяет обрабатывать сообщения параллельно несколькими потребителями
- Используйте алгоритмы без блокировки для достижения параллелизма
- заполнение строки кэша
RingBuffer — это контейнер для хранения сообщений, а внутренняя реализация использует массив:
private final Object[] entries;
Перед запуском Disruptor нужно указать размер данных и инициализировать массив:
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
После того, как массив будет инициализирован, он больше не будет повторно использоваться, и все сообщения будут повторно использовать эти созданные объекты, так что этомассив циклов, реализация RingBuffer показана на следующем рисунке:
Затем при работе с кольцевым массивом необходимо контролировать доступ производителя и потребителя к массиву. С одной стороны, поскольку Disruptor поддерживает несколько производителей и несколько потребителей, необходимо обеспечить безопасность потоков.Для обеспечения производительности блокировки не используются для обеспечения безопасности потоков (только BlockingWaitStrategy использует блокировки).В управлении доступом RingBuffer, В основном с помощью CAS:
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
С другой стороны, при доступе к скорости производителей и потребителей производители не могут писать неиспользованные сообщения, что приведет к потере сообщений.В RingBuffer указатели головы и хвоста не используются для управления, а контролируются Sequence.Когда производитель пишет данные, он будет использовать текущий серийный номер плюс количество данных, которые нужно записать, и сравнить их с местоположением потребителя, чтобы увидеть, достаточно ли места для записи.
В RingBuffer есть такой кусок кода:
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
Этот код называется заполнением строки кэша. Когда дело доходит до этого, вам необходимо понимать механизм кэширования ЦП. Поскольку скорость доступа к памяти слишком далека от скорости ЦП, кэш ЦП также добавляется между ЦП и память.Теперь, как правило, будет добавлено 3 уровня.Первый уровень и второй уровень являются эксклюзивными для ядра ЦП, а кеш третьего уровня используется несколькими ядрами.
Во многих случаях мы хотим кэшировать некоторые значения, которые не будут изменяться в кеше ЦП, например конечные переменные в Java, чтобы можно было максимизировать скорость кеша ЦП, но у кеша ЦП есть функция, которая при кэшировании данные будутСтрока кэша процессораСледовательно, если переменная, которая будет изменяться, определена рядом с конечной переменной, при каждом изменении переменной данные будут записываться обратно в память, а конечная переменная также не будет кэшироваться в кэше ЦП. задние части строки кэша должны быть заполнены, чтобы гарантировать, что никакие другие данные не кэшируются:
abstract class RingBufferPad
{
// 填充缓存行的前部分
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields extends RingBufferPad{
......
// 下面需要被缓存到 CPU 缓存的数据
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final Sequencer sequencer;
......
}
public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer, EventSink{
......
// 填充缓存行的后部分
protected long p1, p2, p3, p4, p5, p6, p7;
......
}
Таким образом, после загрузки данных RingBufferFields в кэш ЦП нет необходимости читать их из памяти.
Disruptor повышает производительность с помощью различных мер, поэтому он такой быстрый.
Обратите внимание на публичный аккаунт WeChat, пообщайтесь о других