Оригинальный адрес:HaifeiWu и блог его друзей
адрес блога:www.hchstudio.cn
Добро пожаловать на перепечатку, пожалуйста, указывайте автора и источник, спасибо!
Недавно я исследовал некоторые проблемы с очередями, сегодня я поделюсь высокопроизводительным очередным Disruptor.
what Disruptor ?
Это высокопроизводительная очередь, разработанная британской валютной торговой компанией LMAX.Первоначальная цель исследований и разработок - решить проблему задержки очередей памяти. Один поток системы, разработанной на основе Disruptor, может поддерживать 6 миллионов заказов в секунду.
В настоящее время многие известные проекты, включая Apache Storm и Log4j2, используют Disruptor для достижения высокой производительности. Disruptor и Netty используются в компании-арендодателе для обработки данных GPS в реальном времени, и их производительность довольно высока. Эта статья дает общее представление о принципе реализации Disruptor с практической точки зрения.
why Disruptor ?
Disruptor решает проблему низкой скорости очереди следующим образом:
- Структура кольцевого массива
Чтобы избежать сборки мусора, используйте массив вместо связанного списка. Потому что массивы более дружественны к механизму кэширования процессора.
- позиционирование элемента
Длина массива составляет 2^n, а скорость позиционирования увеличивается за счет побитовой операции. Индексы принимают форму увеличения. Не беспокойтесь о переполнении индекса. Индекс является длинным типом, и даже если используется скорость обработки в 1 миллион запросов в секунду, его использование займет 300 000 лет.
- беззамковая конструкция
Каждый поток-производитель или потребитель будет сначала запрашивать позицию рабочего элемента в массиве, а после приложения напрямую записывать или читать данные в этой позиции.
- Оптимизация для проблем с псевдо-обменом Disruptor устраняет эту проблему, по крайней мере, для архитектур процессоров с размером строки кэша 64 байта или меньше (возможно, что строка кэша процессора составляет 128 байт, поэтому использование 64-байтового заполнения все равно вызовет проблему ложного разделения), добавляя дополнения чтобы убедиться, что порядковый номер кольцевого буфера не существует в той же строке кэша, что и все остальное.
how Disruptor ?
Из приведенного выше введения мы, вероятно, можем понять, что Disruptor — это высокопроизводительная безблокировочная очередь, так как же ее использовать?
Во-первых, в соответствии с моделью программирования Disruptor, управляемой событиями, нам нужно определить событие для переноса данных.
public class DataEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long getValue() {
return value;
}
}
Чтобы Disruptor предварительно назначал нам эти события, нам нужно построить EventFactory для выполнения построения
public class DataEventFactory implements EventFactory<DataEvent> {
@Override
public DataEvent newInstance() {
return new DataEvent();
}
}
Как только мы определили события, нам нужно создать потребителя, который обрабатывает эти события. В нашем случае все, что мы хотим сделать, это вывести значение из консоли.
public class DataEventHandler implements EventHandler<DataEvent> {
@Override
public void onEvent(DataEvent dataEvent, long l, boolean b) throws Exception {
new DataEventConsumer(dataEvent);
}
}
Далее нам нужно инициализировать Disruptor и определить производителя для генерации сообщений.
public class DisruptorManager {
private final static Logger LOG = LoggerFactory.getLogger(DisruptorManager.class);
/*消费者线程池*/
private static ExecutorService threadPool;
private static Disruptor<DataEvent> disruptor;
private static RingBuffer<DataEvent> ringBuffer;
private static AtomicLong dataNum = new AtomicLong();
public static void init(EventHandler<DataEvent> eventHandler) {
//初始化disruptor
threadPool = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(new DataEventFactory(), 8 * 1024, threadPool, ProducerType.MULTI, new BlockingWaitStrategy());
ringBuffer = disruptor.getRingBuffer();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
LOG.info("放入队列中数据编号{},队列剩余空间{}", dataNum.get(), ringBuffer.remainingCapacity());
}
}, new Date(), 60 * 1000);
}
/**
*
* @param message
*/
public static void putDataToQueue(long message) {
if (dataNum.get() == Long.MAX_VALUE) {
dataNum.set(0L);
}
// 往队列中加事件
long next = ringBuffer.next();
try {
ringBuffer.get(next).set(message);
dataNum.incrementAndGet();
} catch (Exception e) {
LOG.error("向RingBuffer存入数据[{}]出现异常=>{}", message, e.getStackTrace());
} finally {
ringBuffer.publish(next);
}
}
public static void close() {
threadPool.shutdown();
disruptor.shutdown();
}
}
Наконец, мы определяем метод Main для выполнения кода.
public class EventMain {
public static void main(String[] args) throws Exception {
DisruptorManager.init(new DataEventHandler());
for (long l = 0; true; l++) {
DisruptorManager.putDataToQueue(l);
Thread.sleep(1000);
}
}
}
Если вы заинтересованы в приведенном выше коде, пожалуйста, переместитеGitHub.com/HaifeiW U/Первый…
Затем мы можем увидеть данные, напечатанные на консоли.
резюме
Disruptor обеспечивает высокую производительность в ситуациях с высокой степенью параллелизма благодаря сложной конструкции без блокировок.
Кроме того, асинхронный режим в Log4j 2 обрабатывается Disruptor. Здесь арендодатель сталкивается с небольшой проблемой, то есть при использовании Log4j 2 для отправки данных журнала в logstash через режим TCP связь прерывается из-за проблем с сетью, из-за чего Log4j 2 продолжает записывать данные в кольцевой буфер, а данные кольцевого буфера не имеет потребителей, что приводит к нехватке памяти сервера. Решение состоит в том, чтобы сделать очередь Disruptor в Log4j 2 ограниченной или переключиться в режим UDP для записи данных журнала (если данные не важны).