Глубокий хороший текст! Продвинутые передовые знания RocketMQ!

RocketMQ

предисловие

Всем привет, меня зовут jack xu. Эта статья является последней из серии RockeMQ. В ней рассказывается о некоторых передовых знаниях RockeMQ, которые будут использоваться в наших обычных интервью. Овладение этими вещами также отражает мастерство и негодяй. Разница между мальчики.

Лекция 1:«Архитектура высокой доступности RocketMQ и развертывание асинхронного кластера с двумя ведущими и двумя подчиненными устройствами»

Вторая лекция:«Наклейки грамотности RocketMQ и использование Java API»

Исходный код, используемый в этой статье:GitHub.com/Сюй Хаоцзя/рок…

Перевод официальной документации:Оооооооооопал

Для того, чтобы каждый мог ясно и иерархически усвоить это знание, мы начинаем сПроизводитель, Брокер, Потребительобъясняется в трех измерениях.

режиссер

правила отправки сообщений

В RocketMQ эффект разделения, аналогичный kafka, достигается на основе нескольких очередей сообщений. Если объем данных, которые должны быть отправлены и получены темой, очень велик, для повышения скорости обработки требуется машина, которая может поддерживать повышенную параллельную обработку.В настоящее время тема может установить одну или несколько очередей сообщений по мере необходимости. После того, как Topic имеет несколько очередей сообщений, сообщения могут отправляться в каждую очередь сообщений параллельно, а потребители также могут читать и потреблять сообщения из нескольких очередей сообщений параллельно.

Затем сообщение отправляется в очередь сообщений, что нам нужна наша стратегия распределения маршрута. Среди отправленных перегруженных методов такой параметр MessageQueUeever.image.pngВ RocketMQ для нас реализовано три класса реализации:

  • SelectMessageQueueByHash (по умолчанию): это способ непрерывного увеличения и опроса.
  • SelectMessageQueueByRandom: случайный выбор очереди.
  • SelectMessageQueueByMachineRoom: возврат пуст, не реализован.

Если вышеуказанное не может удовлетворить наши потребности, вы также можете настроить MessageQueueSelector и передать его в качестве параметра:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
               Integer id = (Integer) arg;
               int index = id % mqs.size();
               return mqs.get(index);
       }
}, orderId);

Пример исходного кода /ordermessage/Producer.java

последовательное сообщение

Очень классический вопрос интервью, как обеспечить порядок новостей? Идея состоит в том, что сообщения, которые должны быть гарантированно упорядочены, должны отправляться в одну и ту же очередь сообщений. Во-вторых, очередь сообщений может потребляться только одним потребителем, что гарантируется механизмом распределения очереди сообщений. Наконец, потребление mq внутри потребителя гарантированно будет в порядке.Нам нужно добиться взаимно-однозначного отношения между производителями — очередями сообщений — потребителями.

Конкретный процесс работы выглядит следующим образом:

  1. Когда производитель отправляет сообщение, оно должно прийти к брокеру по порядку. Таким образом, для производителя вы можете использовать многопоточность не для асинхронной отправки, а для последовательной отправки.
  2. При записи Брокеру следует писать последовательно. То есть сообщения на одну тему надо писать централизованно, выделятьта же очередь сообщений, Не разгоняется.

Достичь этого эффекта очень просто, пока мы передаем один и тот же hashKey при отправке, будет выбрана одна и та же очередь.

image.png3. Может быть только один поток, когда потребитель потребляет, иначе из-за разных скоростей потребления в базе данных могут быть неупорядоченные записи. В Spring Boot для потребленияMode установлено значение ORDERLY, а в API Java можно передать класс реализации MessageListenerOrderly.

consumer.registerMessageListener(new MessageListenerOrderly() {

Конечно, последовательное потребление принесет некоторые проблемы:

  1. Обнаружено сообщение об ошибке сообщения, нельзя пропустить, текущее потребление очереди приостановлено
  2. Снижение производительности обработки сообщений

сообщение о транзакции

Существует множество решений для распределенных транзакций, одно из которых заключается в использовании сообщений транзакций RocketMQ для достиженияокончательная согласованность. Давайте посмотрим, как реализован RocketMQ. Ниже приведена блок-схема официального сайта RocketMQ, давайте проанализируем и объясним ее по картинке.ракета присутствует.apache.org/ракета присутствует/дни спустя…

image.png

  1. Производитель отправляет половинное сообщение на сервер RocketMQ. Что такое половинное сообщение? Это означает, что сообщение потребителя не может быть временно доставлено. Отправитель успешно отправил сообщение на сервер MQ. В это время сообщение помечено в видеВременно не могу доставитьСтатус, нужно дождаться ответа производителя на сообщениеВторое подтверждение.
  2. Сервер MQ отправляет подтверждение производителю, сообщая производителю, что полусообщение успешно получено.
  3. Отправитель начинает выполнять логику транзакции локальной базы данных.
  4. После завершения выполнения результат сообщается серверу MQ, локальная транзакция успешно выполняется и сообщается о фиксации.После того, как сервер MQ получает фиксацию, статус полусообщения устанавливается наРезультат, потребитель в конечном итоге получит сообщение; если локальная транзакция не будет выполнена, будет отправлен откат, и сервер MQ удалит половинное сообщение после получения отката, а абонентская плата не получит сообщение.
  5. Если сообщение с подтверждением на шаге 4 не получено, проверьте статус транзакции.Проверьте сообщение:Из-за сбоев в сети, перезапуска производителя и т. д. отправитель RocketMQ предоставит интерфейс состояния обратной проверки транзакции. интерфейс обратной проверки выполнение выполнено успешно.
  6. После того, как отправитель получит ответное сообщение, ему необходимо проверить окончательный результат выполнения локальной транзакции соответствующего сообщения.
  7. Отправитель снова отправляет второе подтверждение на основе проверки конечного состояния локальной транзакции и отправляет фиксацию или откат.

Выше показан поток выполнения всего сообщения транзакции. Давайте посмотрим, как работать с ним в коде. RocketMQ предоставляет интерфейс TransactionListener, нам нужно его реализовать, а затем реализовать локальную логику транзакций в методе executeLocalTransaction.

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //local transaction process,return rollback,commit or unknow
        log.info("executeLocalTransaction:"+JSON.toJSONString(msg));
        return LocalTransactionState.UNKNOW;
    }

Этот метод должен возвращать статус, откат, фиксацию или неизвестность.После возврата в неизвестность, поскольку неизвестно, успешна транзакция или нет, Брокер будет активно инициировать запрос о результате выполнения транзакции, поэтому необходимо реализовать Метод обратного вызова checkLocalTransaction.

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
       log.info("checkLocalTransaction:"+JSON.toJSONString(msg));
       return LocalTransactionState.COMMIT_MESSAGE;
    }

По умолчанию общее количество проверок составляет 15 раз, интервал для первой проверки составляет 6 с, а интервал для каждой последующей проверки составляет 60 с. Наконец, укажите прослушиватель транзакций, когда производитель отправляет его.image.pngИсходный код находится в example/transaction/TransactionProducer.java.

сообщение с задержкой

Много раз наша деревня будет в таких бизнес-сценариях:在一段时间之后,完成一个工作任务Например: после того, как заказ такси Didi будет выполнен, если пользователь не прокомментировал, он будет автоматически оценен в 5 звезд в течение 48 часов, заказ на вынос будет автоматически отменен без оплаты в течение 30 минут и т. д. Существует множество решений этой проблемы, одно из которых заключается в использовании для достижения очереди задержки RocketMQ, но версия с открытым исходным кодом была кастрирована и может поддерживать только сообщения определенного уровня, а коммерческая версия может указывать время произвольно.

   msg.setDelayTimeLevel(2); // 5秒钟

Например, level=2 означает 5 секунд, всего поддерживается 18 уровней, а уровень задержки настраивается в коде MessageStoreConfig:

  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Это используется в Spring Boot следующим образом

  rocketMQTemplate.syncSend(topic,message,1000,2);// 5秒钟

Исходный код находится в example/delay/DelayProducer.java.

Broker

физическое хранилище

Давайте зайдем в папку, которую хранит RocketMQ, и посмотрим, эта директория указывается при установке.

image.pngДалее описаны функции этих папок по очереди:

  1. контрольная точка: контрольная точка файла, в которой хранится время последней очистки или временная метка журнала фиксации, очереди потребления и файла индекса.
  2. commitlog: каталог хранения сообщений, набор файлов, размер каждого файла по умолчанию составляет 1 ГБ, когда первый файл заполнен, второму файлу будет присвоено имя с начальной суммой. Например, начальное смещение — 1073741824, имя второго файла — 00000000001073741824 и т. д.

image.png

  1. Конфигурация: информация о текущей конфигурации, включая информацию о фильтрации сообщений темы, ход потребления сообщений в режиме потребления кластера, ход извлечения очереди сообщений с задержкой, информацию о конфигурации группы потребителей сообщений, свойства конфигурации темы и т. д.
  2. потребительская очередь: каталог хранения очереди сообщений, мы видим, что под папкой потребляемой очереди строится папка по названию топика, а под каждой темой строится папка по номеру очереди сообщений, и под каждой папка очереди сообщений. Она предназначена для хранения смещения, размера и атрибутов тега сообщения в журнале фиксации.

image.png5. index: Каталог хранения файла индекса сообщений.При отправке сообщений с использованием java API мы видели, что будет передан параметр ключей, который используется для извлечения сообщений. Поэтому, если есть ключи, сервер создаст индексный файл, и каждое ключевое слово, разделенное пробелами, создаст индекс. Один файл IndexFile может хранить 2000 Вт индексов, а фиксированный размер файла составляет около 400 МБ.В индексе используется хэш-индекс, поэтому ключ должен быть уникальным и максимально не повторяться.

идея хранения

Давайте посмотрим на инструкции на официальном сайте RocketMQ,ракета присутствует. Apache.org/ракета присутствует/Ok…, давайте сначала посмотрим, почему kafka не может поддерживать больше разделов, а потом как мы поддерживаем больше разделов в RocketMQ.

image.png

  1. Каждый раздел хранит все данные сообщения. Хотя каждый раздел записывается на диск последовательно, по мере увеличения количества одновременных операций записи в раздел записи становятся случайными с точки зрения операционной системы.
  2. Из-за разрозненных файлов данных сложно использовать механизм Linux IO Group Commit.

Так что RocketMQ просто применил другой подход и разработал новый метод хранения файлов, то есть все сообщения всех тем записываются в один и тот же файл, так что может быть гарантирована абсолютная последовательная запись. Конечно, потребление сложное, чтобы найти сообщения в огромном журнале коммитов, мы не можем пройтись по всем сообщениям, что слишком медленно.

тогда что нам делать? Это упомянутая выше очередь потребления, в которой хранится последнее использованное смещение темы, потребляемой группой потребления. Когда мы потребляем, сначала читаем начальное смещение смещения физической позиции, размер и значение хэш-кода тега сообщения постоянного сообщения из очереди потребления, а затем читаем реальную сущность сообщения, которое нужно извлечь из раздела содержимого журнала фиксации.

Очередь потребления можно понимать как индекс сообщения, в ней нет сообщения, конечно, такая концепция хранения не идеальна.Для журнала коммитов, хотя он и пишется последовательно, при чтении он становится полностью случайным чтением, при чтении сообщения сначала читается очередь потребления, а затем читается журнал коммитов, что увеличивает накладные расходы.

Политика очистки файлов

Как и в kalka, содержимое журнала коммитов находится вПотреблениеПосле этого он не будет удален. У этого есть два преимущества. Во-первых, он может многократно использоваться несколькими группами потребителей. Пока группа потребителей изменена, он может использоваться с нуля. Каждая группа потребителей поддерживает свое собственное смещение. ; другой - поддержка обратного отслеживания сообщений. , вы можете искать в любое время.

Однако, если файлы не очищены, количество файлов будет продолжать увеличиваться, что в конечном итоге приведет к тому, что свободного места на диске будет становиться все меньше и меньше, поэтому RocketMQ удалит файлы с истекшим сроком действия, такие как commitLog, и использует очередь.72 часадокумент. Здесь будут запущены два потока.

    private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

После того, как файлы с истекшим сроком действия выбраны, в какое время их чистить, есть два случая. Один через обычные задачи,каждый день в четыре утрачтобы удалить эти файлы. Во-вторых, использование дискового пространства.более 75%Вот, брови уже горят в это время, зачем мне ждать до четырех часов,НемедленноТолько что почистил.

Если ситуация более серьезная, если использование дискового пространства превышает85%, запустит пакетную очистку файлов,независимо от того, истек ли срок его действия, пока не будет достаточно места; если использование диска превышает90%,встречаМусорсообщение написать.

нулевая копия

Мы все знаем, что сообщение RocketMQ хранится на диске, но как вы можете сделать такую ​​и такую ​​низкую задержку высокой пропускной способности, которая является тайной для использования технологии нулевой копии.

Во-первых, позвольте мне представить концепцию кэша страниц. Это на уровне операционной системы. Если ЦП хочет прочитать или обработать данные на диске, он должен загрузить данные с диска в память. Размер этой загрузки имеет фиксированную единицу измерения, называемую страницей. Стандартный размер страницы в x86 linux составляет 4 КБ. Если вы хотите улучшить скорость доступа к диску или уменьшить количество операций ввода-вывода диска, вы можете кэшировать страницу, к которой осуществляется доступ, в памяти, и эта область памяти называется кэшем страницы.

В следующий раз, когда вы будете обрабатывать запрос ввода-вывода, сначала загляните в кэш страниц и управляйте им напрямую, если вы его найдете.Если вы не найдете его, перейдите на диск, чтобы найти его. Разумеется, сам Page Cache также будет предварительно считывать данные.При первой операции запроса на чтение каждого файла система также считывает соседние страницы запрошенной страницы вместе. Но здесь все еще есть проблема.Мы знаем, что виртуальная память делится на пространство ядра и пространство пользователя.Кэш страниц принадлежит пространству ядра и не может быть доступен для пространства пользователя.Кроме того, его необходимо скопировать из пространства ядра в буфер пользовательского пространства. Этот процесс копирования снижает скорость доступа к данным.

Для решения этой проблемы была разработана технология нулевого копирования,Просто сделайте сопоставление адресов данных кэша страниц в пользовательском пространстве, чтобы пользователь мог напрямую читать и записывать кэш страниц с помощью операций с указателями, и при этом не требуются системные вызовы (такие как read()) и копирование памяти. Конкретная реализация в RocketMQ заключается в использовании mmap (карта памяти, карта памяти), в то время как kafka использует sendfile. image.png

потребитель

Балансировка нагрузки и ребалансировка на стороне потребителя

和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。 потребительПока,потребительУвеличивать, в это время будет использоваться наш ребаланс.

В строке 277 RebalanceImpl.class есть стратегия перебалансировки.

      AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

      List<MessageQueue> allocateResult = null;
      try {
               allocateResult = strategy.allocate(this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
           } catch (Throwable e) {
                log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);
                return;
           }

AllocateMessageQueueStrategy имеет6种Реализованная стратегия также может быть настроена и указана на стороне потребителя.

consumer.setAllocateMessageQueueStrategy();
  • AllocateMessageQueueAveragely: Алгоритм среднего распределения (по умолчанию).

image.png

  • AllocateMessageQueueAveragelyByCircle: Кольцевая очередь сообщений о распределении

image.png

  • AllocateMessageQueueByConfig: выделять очереди в соответствии с конфигурацией и загружать в соответствии с указанной пользователем конфигурацией.
  • AllocateMessageQueueByMachineRoom: настроить очередь в соответствии с указанным компьютерным залом.
  • AllocateMachineRoomNearby: настроить очередь в соответствии с ближайшей компьютерной комнатой.
  • AllocateMessageQueueConsistentHash: согласованный хеш, основанный на cid потребителя.

Количество очередей должно быть больше, чем количество потребителей.

Очереди повторных попыток и недоставленных сообщений

Со стороны потребителя, если出现异常, такие как недоступность базы данных, проблемы с сетью, перебои в подаче электроэнергии и т. д. В это время брокеру возвращается RECONSUME_LATER, указывая, что он попытается повторить попытку позже. В это время сообщение будет отправлено обратно брокеру и помещено в очередь повторных попыток RocketMQ. Сервер создаст очередь повторных попыток с именем, начинающимся с %RETRY%, для группы потребителей.

image.pngОчередь повторных попыток будет снова доставлена ​​в ConsumerGroup через некоторое время. Если она по-прежнему ненормальна, она снова попадет в очередь повторных попыток. Интервал времени между попытками будет постепенно уменьшаться, начиная с 10 секунд до 2 часов: 10 секунд 30 секунд 1 минута 2 минуты 3 минуты 4 минуты 5 минут 6 минут 7 минут 8 минут 9 минут 10 минут 20 минут 30 минут 1 час 2 часа, максимум 16 попыток.

И если повторное потребление продолжает давать сбой определенное количество раз (по умолчанию 16 раз), оно будет доставлено в DLQ.очередь недоставленных сообщений. Брокер создаст очередь недоставленных сообщений, имя очереди недоставленных сообщений%DLQ%+ConsumerGroupName, приложение может отслеживать очередь недоставленных сообщений для ручного вмешательства.В общем, нам не нужно повторять 16 раз в реальном производстве, что является пустой тратой времени и производительности.Теоретически, когда количество повторных попыток достигает желаемого результата, если потребление все еще терпит неудачу, тогда нам нужно записать соответствующее сообщение и завершите повторную попытку.

Исходный код находится в jackxu/SimpleConsumer.java.

Анализ выбора MQ

Анализ и сравнение трех распространенных MQ на рынке перечислены ниже для справки и сравнения, когда вы действительно используете их в проекте:image.pngЧто ж, серия RocketMQ подошла к концу, спасибо за просмотр~