RocketMQ (восемь): отправка сообщений

задняя часть сервер Командная строка Netty
RocketMQ (восемь): отправка сообщений

Ноль изобретательности Пожалуйста, указывайте первоисточник для перепечатки, спасибо!

Схема развертывания сети RocketMQ

  • NameServer: в системе это служба именования, обновляющая и обнаруживающая брокерские службы.
  • Broker-Master: Хост-сервер сообщений брокера.
  • Broker-Slave: подчиненный сервер сообщений брокера.
  • Производитель: Производитель сообщения.
  • Потребитель: потребитель сообщений.

инструкция:Серия RocketMQ будет представлена ​​вместе с RocketMQ-4.1.0-Incubating.

Читая исходный код, я сделал определенный комментарий, официальный аккаунт [Ingenuity Zero] ответил: Rocketmq, вы можете получить подробные комментарии к китайскому коду на основе Rocketmq4.1.0. Добро пожаловать в звезду и форк!

Си Да рассказал о сути промежуточного программного обеспечения сообщенийПромежуточное программное обеспечение сообщений простое: одна отправка, один депозит и одно потребление., сегодня мы в основном обсуждаемОтправить, который является выделенной цветом частью диаграммы развертывания сети RocketMQ.

Предыдущие статьи серии RocketMQ

Обзор обмена сообщениями

Приведенное выше изображение, вероятно, является основной логикой производителя, отправляющего сообщение брокеру.

Проблемное мышление:

Кэширование информации, связанной с брокером, для клиента сокращает взаимодействие с namesrv, но также уменьшает характер изменений брокера в реальном времени Что происходит, когда брокер внезапно становится недоступным? (обработку Rocketmq смотрите позже), почему производитель так быстро отправляет? В основном из-за netty writeAndFlush? Как производитель выполняет асинхронную отправку? Отправить синхронно? отправили в одну сторону? Что делать, если отправка не удалась?

Общий анализ потока отправки сообщений

Поскольку отправка также включает отправку по времени, последовательную отправку, пакетную отправку и т. д., эта статья представляет собой общее логическое объяснение отправки с учетом проблемы с пространством, и мы продолжим делиться другими ситуациями позже.

Прежде чем читать эту статью, вы должны прочитать следующее:RocketMQ (2): связь RPC.

В статье также рассказывается, как отлаживать локально, поэтому я не буду упоминать об этом здесь.Логика отправки является самой простой по сравнению с хранением и потреблением (в основном то же самое, чтобы продолжать следовать за строкой напрямую), а хранение является самым простым. , Сложный, с последующим потреблением (эти процессы может быть трудно найти в одной строке, и они будут разделены позже).

Синхронная отправка записи

Примечание:может относиться кБыстрый старт RocketMQВот и все.

producer.start

    /**
     * Start this producer instance.
     * </p>
     *
     * <strong>
     * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
     * this method before sending or querying messages.
     * </strong>
     * </p>
     *
     * @throws MQClientException if there is any unexpected error.
     */
    @Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }

В основном делать следующие вещи (основные вещи):

  • Некоторые проверки конфигурации.
  • Создайте сетевой клиент, который взаимодействует с namesrv.
  • По умолчанию каждые 30 секунд обменивается данными с namesrv для получения информации о брокере.
  • По умолчанию неверная информация о брокере удаляется каждые 30 секунд, и всем брокерам отправляются тактовые импульсы.

Создайте объект сообщения

Производитель отправляется объектом Message, посмотрите на структуру Message:

    public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }

Примечание:В основном темы, теги и реальное содержание тела.

отправить отправить

SendResult sendResult = producer.send(msg);

Отправить обработку. Давайте сосредоточимся на том, как обрабатывается отправка.

отправить отправить анализ керна

Несколько способов отправки: синхронный асинхронный односторонний (какой выбрать, нужно судить по ситуации)

Если взять синхронную отправку в качестве примера, тайм-аут по умолчанию составляет 3 с.

SendResult sendResult = producer.send(msg);

Это триггерный метод отправки, мы просто следуем ему, ** первое начальное ощущение: ** первое ощущение от отслеживания — это то, что он включает использование JUC, много использования режима облегчённого веса (по сути, карта для кеширования). и чистое использование.

Основная логика:

Код массово не копируется, а нужный гитхаб основан на Rocketmq4.1.0 плюс подробные китайские комментарии к коду. Добро пожаловать в звезду и форк!

  • Определить, доступна ли услуга? Непосредственно завершить процесс невозможно.

  • Проверка сообщения:

  • Получить информацию о маршрутизации темы

    Если есть кеш, то его можно получить.Если нет, то можно один раз (или два) взаимодействовать с namesrv.Потому что тематическая информация не обязательно существует на сервере брокера, если ее нет, то по умолчанию ( TBW102).

Инкапсулировать информацию заголовка запроса:

// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
 public static final int GET_ROUTEINTO_BY_TOPIC = 105;

Статус обработки запросов, полученных сервером namesrv.

Окончательная информация о маршрутизации аналогична следующей:

  • Режим отправки синхронный, будет 3 раза, а другой 1 раз

    //发送模式是sync 会有3次其他1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    
  • выбрать очередь

    Как выбрать, какой идентификатор очереди отправить этому брокеру? (Клиент загружается сам по себе), поскольку информация о брокере кэшируется в клиенте, возникает проблема (поскольку информация будет синхронизироваться раз в 30 с, что произойдет с брокером в течение 30 с?) Rocketmq обрабатывает это следующим образом. :Переключатель sendLatencyFaultEnableоткрывать ли

    1. Открыть --> Как долго он будет недоступен?

    2. Не открывать (по умолчанию) --> Случайный напрямую (если lastBrokerName не пусто, попробуйте заменить его другим брокером, если его нет, то будет случайный)

  • Вызовите sendKernelImpl, чтобы отправить сообщение Отправить ядро ​​сообщения

    Получить ip адрес по имени брокера, если канал не установлен и не сохранен.

    Установите UNIQ_id, который защищает информацию об IP-адресе клиента.

    При отправке будут функции хука для обеспечения выполнения (отключить хук сообщения, хук отправки сообщения (executeSendMessageHookBefore, executeSendMessageHookAfter).

    Построить заголовок SendMessageRequestHeader, включая генерацию метки времени сообщения, поэтому время каждой машины должно быть согласованным (чтобы позже можно было проверить, сколько времени потребовалось брокеру для получения сообщения).

  • Выберите способ отправки в соответствии с режимом отправки сообщения

    На этот раз мы в основном рассмотрим ситуацию с синхронной отправкой.

    если1 случайВыполните nettywriteAndFlush, чтобы отправить успешный вариант для выпрыгивания и прибытия.3 случаяПодождите до 3 с. Когда оно просыпается здесь? На самом деле он просыпается, когда ситуация с брокером отвечает клиенту:

    Примечание:Здесь CountDownLatch используется для асинхронного преобразования в синхронное.

    если2 случаяУказывает, что отправка не удалась, сразу просыпаться3 случаяНет блокировки (последнее исключение выдается, чтобы указать, что отправка не удалась)

  • Обновление времени доступности брокера

  • оценка ситуации retryAnotherBrokerWhenNotStoreOK

    Если для параметра retryAnotherBrokerWhenNotStoreOK установлено значение true, при сбое отправки будет выбран посредник.

  • Следующее исключение продолжается, и повторите попытку отправки сообщения

На этом процесс отправки клиента, вероятно, завершен.


Если вы чувствуете себя вознагражденным после прочтения, пожалуйста, поставьте лайк, подпишитесь и добавьте официальную учетную запись [Ingenuity Zero], чтобы узнать больше захватывающей истории! ! !

Присоединяйтесь к Планете Знаний и обсуждайте вместе!