Развертывание установки Windows
скачать
адрес:[woohoo.apache.org/wish you/closer. …]
Выберите «Двоичный» для загрузки
Разархивируйте загруженный проект
настроить
добавить системную переменную ROCKETMQ_HOME -> F:\RocketMQ\rocketmq-4.5.2
JAVA_HOME -> F:\Java_JDK\JDK1.8
Добавлена системная переменная пути: каталог Maven/bin
PS: RocketMQ Сообщения хранятся в каталоге хранилища C: \ Users \ Administrator \ Store Store
文件占用较大,注意删除不必要的内容
запускать
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Интегрированный плагин визуального мониторинга Rocket
-
Любой каталог (вытащите проект из любого места) git cloneGitHub.com/Apache/рок…
-
Перейдите в папку «rocketmq-externals\rocketmq-console\src\main\resources» и откройте «application.properties» для настройки.
-
По сути, это сервис SpringBoot, определите порт и не повторяйте его.
server.port=8100
rocketmq.config.namesrvAddr=127.0.0.1:9876
-
Перейти к папке «RocketMQ-Externals \ RocketMQ-Console»
Выполнение 'mvn clean package -Dmaven.test.skip = true', скомпилированная цель
java -jar rocketmq-console-ng-1.0.1.jar
-
Доступ по адресу конфигурации:http://127.0.0.1:8100
Rocket可视化监控插件 增加Topic | 自动增加Topic(4.5.2版本)
Версия 4.5.2 поддерживает автоматическое создание темы.
Версия 4.3.0 Тема должна быть настроена через программу мониторинга, иначе программа выполнения сообщит об ошибке, такого маршрута нет
SpringBoot интегрирует RocketMQ
<!--RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
Основные понятия RocketMQ
Обзор
На основе распределенных систем RocketMQ вас обычно можно разделить на четыре кластера: Name Server, Broker, Producter, Consumer.
-
name server
-
Предоставление облегченных сервисов обнаружения и маршрутизации;
-
Каждый узел хранит всю информацию о маршрутизации и соответствующие службы чтения и записи;
-
Хранилище поддерживает горизонтальное расширение
-
-
broker
- Предоставлять услуги хранения сообщений, соответствующие механизмам TOPIC и QUEUE;
- Есть режимы push и pull;
- Высокая доступность за счет 2 или 3 копий;
- Обеспечивает возможность накапливаться сотни миллионов сообщений;
- Обеспечить устранение неисправностей, статистические функции и функции сигнализации;
-
producer
- Поддержка распределенного развертывания, отправка сообщений брокерам через модуль балансировки нагрузки
- Сбой быстрой поддержки
- низкая задержка
-
consumer
- Поддержка режимов push и pull
- Поддержка потребления кластера и широковещания
Основной модуль
Name Server
Обеспечить управление брокером; управление маршрутизацией (управление маршрутизацией)
NameServer, часто называемый службой обнаружения имен, играет роль ретранслятора в RocketMQ.Это служба без сохранения состояния, и связь между несколькими серверами NameServer отсутствует. Любой Производитель, Потребитель, Брокер взаимодействует со всеми Серверами имен, запрашивая или отправляя данные Серверам имен. И все они односторонние, Производитель и Потребитель запрашивают данные, а Брокер отправляет данные. Именно благодаря этой односторонней связи горизонтальное расширение RocketMQ становится легким.
- Предоставление облегченных сервисов обнаружения и маршрутизации;
- Каждый узел хранит всю информацию о маршрутизации и соответствующие службы чтения и записи;
- Хранилище поддерживает горизонтальное расширение
Резюме. Тема Информация о текущем брокере, информация о маршрутизации и т. Д.
Broker Server
Обеспечьте модуль удаленного взаимодействия, управление клиентами, службы хранения, службу высокой доступности (ведущий и подчиненный), службу индексирования.
- Предоставлять услуги хранения сообщений, соответствующие механизмам TOPIC и QUEUE;
- Есть режимы push и pull;
- Высокая доступность за счет 2 или 3 копий;
- Предоставляет возможность накапливать сотни миллионов сообщений;
- Обеспечить устранение неисправностей, статистические функции и функции сигнализации;
producer
- Поддержка распределенного развертывания, отправка сообщений брокерам через модуль балансировки нагрузки
- Сбой быстрой поддержки
- низкая задержка
consumer
- Поддержка режимов push и pull
- Поддержка потребления кластера и широковещания
Знакомство с основными ролями
режиссер
Производитель отправляет сообщение, сгенерированное бизнес-системой, брокеру. RocketMQ предоставляет различные методы отправки: синхронный, асинхронный и односторонний.
продюсерская группа
Производители с одинаковой ролью группируются вместе. Если исходный производитель выходит из строя после транзакции, брокер свяжется с другим экземпляром производителя в той же группе производителей, чтобы продолжить фиксацию или откат транзакции.
потребитель
Потребитель получает информацию от брокера и возвращает ее приложению. Для корректности нашего приложения предусмотрено два типа потребителей:
Вытягивающие потребители: Вытягивающие потребители извлекают сообщения от брокера.После извлечения пакета сообщений пользовательская прикладная система инициирует процесс потребления.
Push-потребитель: Push-потребитель, с другой стороны, включает в себя процесс извлечения и потребления сообщений и поддерживает другую внутреннюю работу, оставляя интерфейс обратного вызова для реализации конечным пользователем, который реализуется, когда сообщение получает содержимое для выполнения.
группа потребителей
Потребители с одинаковой ролью группируются вместе, называемые группами потребителей, которые достигают целей балансировки нагрузки и отказоустойчивости.
Примеры потребителей в потребительской группе должны иметь ту же тему подписки
Тема
Каталог Topic - это сообщение, в этом каталоге производителя передает сообщение, вытягивает потребителей сообщения, потребителя может подписаться на множество той же темы, производитель может быть передан множеству темы
PS: RocketMQ основан на модели публикации-подписки, а ядром публикации-подписки является тема Topic.
Сообщение
Сообщение – это информация, которая доставляется. Сообщение должно иметь Тему, под которой можно понимать адрес в письме. Сообщение также может иметь необязательный тег и дополнительные пары ключ-значение. Например: вы можете установить ключи в своем бизнесе на свои сообщения, искать сообщения в службах брокера для диагностики проблем во время разработки.
очередь сообщений
Тема разделена на одну или несколько очередей сообщений. Очередь разделена на 3 роли: асинхронный мастер, синхронный мастер и ведомый. Если вы не переносите потерю сообщений, мы рекомендуем вам развернуть синхронный главный сервер и добавить подчиненную очередь. Если вы допускаете потери, но хотите, чтобы очередь всегда была доступна, вы можете развернуть асинхронные главные и подчиненные очереди. Если вы хотите самое простое, вам просто нужен асинхронный мастер и никаких подчиненных очередей. Существует также два способа сохранения сообщений на диск. Рекомендуемый метод – асинхронное сохранение. Синхронное сохранение стоит дорого и приведет к потере производительности. Если вам нужна надежность, мы рекомендуем использовать синхронный метод ведущий+ведомый.
Тег
Другими словами, теги — это подтемы, предоставляющие пользователям дополнительную гибкость. Сообщения с одной и той же темой могут иметь разные теги.
Брокер (очередь)
Брокер — это основной компонент RocketMQ, который получает сообщения, отправленные производителями, сохраняет их и подготавливается к обработке запросов на вытягивание от потребителей. Он также хранит метаданные, связанные с сообщениями, включая группы потребителей, смещения для успешного потребления, темы и очереди.
служба имен
Служба имен в основном предоставляет информацию о маршрутизации. Клиент производителя/потребителя ищет тему и находит список очередей для связи.
порядок сообщений
когдаDefaultMQPushConsumer
используется, вы должны решить, потреблять ли сообщения последовательно или одновременно
- последовательное потребление
Последовательное использование сообщений означает, что сообщения будут потребляться в том порядке, в котором они были отправлены в очередь производителем. Если вы вынуждены использовать глобальный порядок, убедитесь, что в вашей теме есть только одна очередь сообщений.
Если указано последовательное потребление, количество сообщений, потребляемых одновременно, равно количеству групп потребителей, подписанных на эту тему.
- Употреблять одновременно
При одновременном потреблении сообщений максимальное количество одновременно потребляемых сообщений зависит от размера пула потоков, указанного потребляющим клиентом.
Лучшие практики
Лучшие практики продюсера
-
Приложение максимально использует одну тему, а подтип сообщения идентифицируется тегами, которые могут свободно устанавливаться приложением.Только когда теги установлены для отправки сообщений, потребители могут использовать теги для фильтрации сообщений в брокере при подписке на сообщения.
-
Уникальный идентификационный код каждого сообщения на бизнес-уровне должен быть установлен в поле ключей, чтобы облегчить обнаружение проблемы потери сообщения в будущем. Поскольку это хэш-индекс, убедитесь, что ключ максимально уникален, чтобы избежать возможных конфликтов хэшей.
Если сообщение отправлено успешно или не удалось, для печати журнала сообщений обязательно распечатайте поля sendresult и key.
-
Для приложений, в которых сообщения не могут быть потеряны, должен быть механизм повторной передачи сообщений. Например, если сообщение не может быть отправлено, оно сохраняется в базе данных, и может быть программа с синхронизацией, которая попытается повторно передать или вручную запустить повторную передачу.
-
Если некоторые приложения не обращают внимания на то, успешно ли отправлено сообщение, используйте метод sendOneWay для прямой отправки сообщения.
Рекомендации для потребителей
- Процесс потребления должен быть идемпотентным (то есть дедупликация на стороне потребителя)
- Попробуйте использовать метод пакетного потребления, который может значительно улучшить пропускную способность потребления.
- Оптимизируйте процесс потребления каждого сообщения
Основная проблема MQ
1. Очередь сообщений, подходящая для решения задач
Основные проблемы, которые необходимо решить: асинхронность, развязка, отсечение пиков.
Но введение очереди сообщений будет иметь много дополнительных проблем, таких как сложность системы будет значительно увеличена, в то время как необходимость решения повторяющихся выданных повторяющихся запросов, порядок потребления, потеря сообщения, механизм повторных попыток и т. д., это не злоупотреблять, соответствующая сцена с правильной технологией
2. Модель сообщений: разница между темами и очередями
Эволюция очередей сообщений
1. Начальный этап
Исходная очередь сообщений является очередью в строгом смысле. Очередь — это структура данных, которая действует по принципу «первым пришел — первым вышел».Очередь ранних сообщений была разработана в соответствии со структурой данных «очередь»..
Модель очереди:
Когда производитель отправляет сообщение, это операция постановки в очередь. Когда потребитель получает сообщение, это операция удаления из очереди, то есть операция удаления. Контейнер, в котором сообщение хранится на стороне сервера, естественно называется «очередью». ".
-
Если есть несколькоПроизводитель отправляет в ту же очередьСообщения, сообщения, которые могут быть использованы в этой очереди, представляют собой набор всех сообщений, созданных этими производителями. Порядок сообщений - это производителиЕстественный порядок отправки сообщений.
-
Если естьНесколько потребителей получают одну и ту же очередьсообщения между этими потребителями на самом делеконкурентные отношения, каждый потребитель может получить только часть сообщений в очереди, то есть любойСообщение может быть получено только одним из потребителей.
2. Этап модели публикации-подписки
При необходимостиДанные сообщения распространяются на несколько потребителей,ТребоватьКаждый потребитель может получать полное количество сообщенийНапример, для данных заказа система контроля рисков, система анализа, платежная система и т. д. должны получать сообщения.
В настоящее время одна очередь не может удовлетворить спрос.Каждый потребитель создает отдельную очередь для производителя для отправки нескольких копий.但是同样的一份消息数据被复制到多个队列中会пустая трата ресурсови, что более важно, производитель должен знать, сколько существует потребителей. Отправка отдельного сообщения для каждого потребителя фактически нарушает первоначальный замысел проекта «разъединить» очередь сообщений.
Для решения этой проблемы была разработана другая модель сообщений:модель публикации-подписки(Шаблон публикации-подписки)
Отправитель сообщения называется издателем, приемник сообщения называется абонентом, а контейнер, в котором хранится сообщение на сервере, называется темой.
-
Издатели отправляют сообщения в тему, подписчикам необходимо "подписаться на тему" перед получением сообщения.
-
В каждой подписке подписчики могут получать все сообщения по теме.
3. Резюме:
-
Долгое время шаблон очереди и шаблон публикации-подписки сосуществовали.
-
Некоторые очереди сообщений поддерживают обе модели сообщений, например ActiveMQ.
-
Сравнивая эти две модели, производитель — это издатель, потребитель — это подписчик, а очередь — это топик, и между ними нет существенной разницы. Самая большая разница между ними заключается в следующем:Данные сообщения могут быть использованы несколько раз проблему.
-
Фактически, в этой модели публикации-подписки, если есть только один подписчик, это в основном то же самое, что и модель очереди. То есть модель публикации-подписки функционально совместима с моделью очереди.
Во-вторых, модель сообщений RabbitMQ.
немногие все еще сохраняютсяИспользование модели очередиодин из продуктов.
Rabbitmq используетМодуль обмена решает проблему нескольких потребителей. Exchange находится между производителем и очередью, производителю все равно, в какую очередь отправить сообщение, ноОтправить сообщение в Exchange, по обменунастроенная политикачтобы решить, в какие очереди отправить сообщение.
-
Если одно и то же сообщение должно быть использовано несколькими потребителями, оно должно бытьНастройте Exchange для отправки сообщений в несколько очередей, каждая очередь имеетХранить полные данные сообщения, который может предоставить потребительскую услугу для потребителя.
3. Модель сообщений RocketMQ
Модель сообщений, используемая RocketMQ, является стандартной.модель публикации-подписки. Также в RocketMQ есть понятие очереди (Queue).
Механизм потребления очереди сообщений:
Почти все продукты очередей сообщений используют очень простой "механизм запроса-подтверждения, чтобы сообщения не терялись во время доставки из-за сбоев сети или сервера.
На производственной стороне производитель сначала отправляет сообщение серверу, то есть брокеру.После того, как сервер получит сообщение и запишет сообщение в топик или очередь, он будетПодтверждение, отправленное производителем в ответ.如果生产者没有收到服务端的确认或者收到失败的响应,则会повторно отправить сообщение.
На стороне потребителя, после того как потребитель получит сообщение и выполнит собственную бизнес-логику потребления (например, сохранит данные в базе данных), он такжеОтправить подтверждение об успешном потреблении на сервер, сервер считает сообщение успешно обработанным только после получения подтверждения о потреблении, в противном случае он выдаст потребителюОтправить сообщениеДо тех пор, пока соответствующее потребление не будет успешно подтверждено.
Этот механизм подтверждения работает хорошоГарантированная надежность при доставке сообщений, однако внедрение этого механизма создает проблему на стороне потребителя:Чтобы обеспечить упорядоченность сообщений, следующее сообщение не может быть использовано до тех пор, пока определенное сообщение не будет успешно обработано.то есть каждый предмет в любое время,Там может быть только у одного потребления потребительских экземпляров, тогдаНевозможно улучшить общую эффективность потребления со стороны потребителя за счет горизонтального расширения числа потребителей..
Чтобы решить эту проблему, RocketMQ добавляет концепцию очереди в тему:
-
Каждая тема содержит несколько очередейМногоактуальная параллельная продукция и потребление и потребление через несколько очередей. Следует отметить, что RocketMQ гарантирует только порядок сообщения в очереди, уровень тема недопустима для обеспечения строгого порядка сообщения.
-
Производитель будет отправлять сообщения во все очереди, а не "одно и то же сообщение отправляется в каждую очередь один раз",Каждое сообщение будет отправлено в очередь только один раз.
-
Группа потребителей, каждая очередь может потреблять только последовательно, а несколько очередей объединяются для параллельного потребления., степень параллелизма - это количество очередей, чем больше количество очередей, тем больше степень параллелизма, поэтому горизонтальное расширение может бытьУлучшить производительность потребления.
-
Каждая очередь поддерживает положение потребления (смещение) на группу потребления, запишите, где группа потребителей потребляет в этой очереди.
-
Абоненты представлены группами потребителей.Каждая группа потребителей потребляет полное сообщение в теме, процесс потребления между различными группами потребителей не зависит друг от друга, то есть, если сообщение потребляется группой получателей1, оно также будет потребляться группой получателей2.
-
Группа потребителей состоит из нескольких потребителей, и потребители в одной и той же группе находятся в отношениях конкурирующего потребления., каждый потребитель отвечает за потребление части сообщений в группе. Если сообщение используется потребителем Consumer1, другие потребители в той же группе больше не получат это сообщение.
-
Поскольку сообщения должны потребляться несколько раз разными группами, использованные сообщения не будут удалены немедленно, для чего требуется RocketMQ.Поддерживайте позицию потребителя (смещение потребителя) в каждой очереди для каждой группы потребителей., сообщения до этой позиции были потреблены, а сообщения после нее не потреблены.Каждый раз, когда сообщение успешно поглощается, позиция потребления увеличивается на единицу. Когда мы используем очереди сообщений, большинство причин потери сообщений связано с неправильной обработкой места потребления.
4. Модель сообщения Кафки
Модель сообщений Kafka точно такая же, как у RocketMQ, с той лишь разницей, что в Kafka другое название концепции очереди.Соответствующее название у Кафки — «Перегородка»., значение и функция ничем не отличаются.
V. Резюме
- В обычно используемой очереди сообщенийRabbitMQ использует модель очереди, но он также может реализовывать функции публикации-подписки.RocketMQ и Kafka используют модель публикации-подписки., и модели сообщений для них в основном одинаковы.
3. Сообщение потеряно как быть?Как обеспечить надежность передачи сообщения?
Во-первых, как проверить, потеряно ли сообщение?
- Если это компания с относительно полной ИТ-инфраструктурой, у нее, как правило, есть распределенная система отслеживания ссылок, с помощью которой можно легко отслеживать каждое сообщение.
- Если такой системы отслеживания нет, мы можем использовать упорядочение очереди сообщений, чтобы проверить, не потерялись ли какие-либо сообщения.
То есть в случае обеспечения порядка потребления сообщений по порядковому номеру сообщения судят о том, является ли оно непрерывным в сегменте потребления.
решение:
Сообщение от процесса производства до потребления можно разделить на три этапа:
1, этап производства
очереди сообщений через наиболее часто используемыемеханизм подтверждения запроса, чтобы обеспечить надежную доставку сообщения: когда ваш код вызывает метод отправки сообщения, клиент очереди сообщений отправит сообщение брокеру.После того, как брокер получит сообщение, он вернет клиенту подтверждающий ответ, указывающий, что сообщение получено. После того, как клиент получает ответ, он завершает отправку обычного сообщения.
Некоторые очереди сообщений автоматически повторяют после долгое время не получают ответ подтверждения. Если повторение не удается, пользователь будет уведомлен по возврату или исключением. При написании кода для отправки сообщения необходимо обратить внимание,Правильно обрабатывать возвращаемое значение или перехватывать исключения, Вы можете убедиться, что сообщения не теряются на этом этапе.
При синхронной отправке просто будьте осторожны, чтобы перехватывать исключения.
При асинхронной отправке это нужно проверять в методе обратного вызова. Это место требует особого внимания, причина многих потерянных сообщений в том, что мы используем асинхронную отправку, но не проверяем результат отправки в callback.
2. Стадия хранения
При нормальных обстоятельствах на этапе хранения, пока Брокер работает нормально, не будет проблем с потерей сообщений; однако, если Брокер выйдет из строя, например, когда процесс умрет или сервер выйдет из строя, сообщения все равно могут быть потеряны. .
Если надежность сообщения очень высока, вы можете настроить параметры Брокера, чтобы избежать потери сообщений из-за простоя:
- Для Брокера одного узла необходимо настроить параметр Брокер.После получения сообщения напишите сообщение на диск, а затем верните ответ для подтверждения производителю, так что даже если будет даунтайм, так как сообщение было записано на диск, сообщение не потеряется, а потребление может продолжиться после восстановления. Например, в RocketMQ метод очистки flushDiskType должен быть настроен как SYNC_FLUSH для синхронной очистки диска.
- Для кластера брокера, состоящего из нескольких узлов, кластер брокера необходимо настроить следующим образом:По крайней мере, отправьте сообщение более чем на 2 узла, а затем отправьте ответ с подтверждением клиенту.. Таким образом, когда Брокер выходит из строя, другие Брокеры могут заменить отказавшего Брокера без потери сообщений.
3. Стадия сообщения
Стадия потребления аналогична стадии производстваМеханизм подтвержденияЧтобы обеспечить надежную доставку сообщений, после того, как клиент получает сообщение от брокера, он выполняет бизнес-логику потребления пользователя.После успеха брокеру будет отправлен ответ с подтверждением потребления.. Если брокер не получает ответ с подтверждением потребления, это же сообщение будет возвращено, когда сообщение будет извлечено в следующий раз, гарантируя, что сообщение не будет потеряно во время передачи по сети, а также не будет потеряно из-за ошибки при выполнении. логика потребления клиентом.
Необходимо отметить при написании потребительского кода: не отправляйте подтверждение потребления сразу после получения сообщения, но следуетПосле выполнения всей бизнес-логики потребления отправьте подтверждение потребления.
4. обрабатывать дубликаты сообщения в процессе потребления
При передаче сообщения, еслидоставка не удаласьДело, отправитель будетвыполнить повторную попытку, возможно во время повторной попыткигенерировать дубликаты сообщений. Если не обрабатывается дубликата сообщения, он может вызвать ошибку в данных системы.
Например, микрослужба, которая использует сообщения о заказах и подсчитывает сумму заказа, если повторяющиеся сообщения не обрабатываются должным образом, будет возникать повторяющаяся статистика, что приведет к неправильным статистическим результатам.
1. Ситуация с дублированием сообщений обязательно будет
В протоколе MQTT заданное качество обслуживания может быть обеспечено при трех видах стандартной доставки сообщения:
- At most once: Максимум один раз. Оно будет доставлено не более одного раза, то есть нет гарантии надежности сообщения, допускающей потерю сообщений. Как правило, он используется в некоторых сценариях мониторинга, не требующих высокой надежности сообщений, например ежеминутных отчетов о температуре в помещении с оборудованием, при этом допустима небольшая потеря данных.
- At least once: Хотя бы один раз. будет доставлен хотя бы один раз, т.е.Потерянные сообщения не допускаются,ноРазрешить появление небольшого количества повторяющихся сообщений.
- Exactly once: Ровно один раз. Он будет доставлен только один раз, никакие потери или дублирование не допускаются, это самый высокий уровень.
Этот стандарт качества обслуживания применяется не только к MQTT, но и ко всем очередям сообщений. общийКачество обслуживания, обеспечиваемое большинством очередей сообщений, по крайней мере один раз, включая RocketMQ, RabbitMQ и Kafka.. То есть это трудно для очередей сообщений, чтобы гарантировать, что сообщения не являются дублированными.
Примечание: «Ровно один раз», поддерживаемый Kafka, отличается от только что упомянутого стандарта качества службы «Ровно один раз» для доставки сообщений. Это еще одна функция, предоставляемая Kafka. Транзакции, поддерживаемые в Kafka, также понимаются в нашем обычном смысле. несколько отличаются. В Kafka транзакции и Excactly Once в основном используются в сочетании с функциями потоковых вычислений.
Во-вторых, используйте идемпотентность для решения проблемы дублирования сообщений.
Идемпотентность изначально была математической концепцией, ее определение таково: если функция f(x) удовлетворяет условию f(f(x)) = f(x), то функция f(x) удовлетворяет идемпотентности метра. В расширении компьютерного домена он используется для описания операции, метода или услуги.
-
Характеристика идемпотентной операции состоит в том, что ееЛюбое количество выполнений будет иметь тот же эффект, что и одно выполнение.
-
Идемпотентный метод, использующий одни и те же параметры, оказывает такое же влияние на систему при многократном вызове, как и при однократном вызове. Так что не беспокойтесь о каких-либо изменениях в системе из-за повторного выполнения.
Пример:
1, без учета одновременного «баланса счета X до 100 юаней», после первого воздействия на внедрение системы баланс счета X становится равным 100 долларам. Пока параметр обеспечивает 100 юаней без изменений, и даже в этом случае эта операция является идемпотентной операцией, выполняемой много раз, баланс X-счета всегда составляет 100 долларов США, не изменится.
2. "Добавить 100 юаней к балансу счета X", эта операция не является идемпотентной. Каждый раз, когда она выполняется, баланс счета увеличивается на 100 юаней. Влияние многократного выполнения и однократного выполнения на систему (т.е. , остаток на счете ) отличаются.
Если бизнес-логика потребления сообщений идемпотентна, то не нужно беспокоиться о проблеме дублирования сообщений, потому что одно и то же сообщение,Потребление один раз и многократное потребление оказывают одинаковое влияние на систему.. Многократное потребление равняется однократному потреблению. От воздействия на систему: Хотя бы раз + идемпотентное потребление = Ровно один раз.
Лучший способ реализовать идемпотентные операции:Начиная с проектирования бизнес-логики, бизнес-логика потребления разработана как идемпотентная операция..
Обычно используемые методы проектирования идемпотентных операций:
(1) Использованиеуникальное ограничение базы данныхидемпотент
Пример перевода без упомянутой выше функции идемпотентности: добавить 100 юаней на баланс счета X. В этом примере мы можем сделать его идемпотентным, изменив бизнес-логику.
Прежде всего, мы можем ограничить, что только одна операция изменений может быть выполнена на каждой учетной записи для каждого порядка передачи. В распределенной системе существует много способов реализации этого ограничения. Самый простой - создать таблицу передачи передачи в базе данных. Эта таблица имеет три поля: передавать идентификатор примечания, идентификатор учетной записи и изменение количества изменений, а затем комбинируйте два поля идентификатора идентификатора заметки передачи и идентификатора учетной записи, чтобы создать уникальное ограничение, так что для того же идентификатора приема и идентификатора учетной записи Запись может существовать.
Таким образом, наша логика для потребления сообщений может стать следующей: «В таблице потока передачиДобавьте запись о переводе, а затем асинхронно обновите баланс пользователя в соответствии с записью о переводе.. «В операции добавления записи о переводе в таблицу потока переводов, поскольку мы предварительно определили уникальное ограничение «идентификатор счета с идентификатором примечания о переводе» в этой таблице,Только одна запись может быть вставлена в одну и ту же учетную запись для одного и того же квитанции о переводе, и последующие повторные операции вставки будут неудачными., который реализует идемпотентную операцию.
Основываясь на этой идее, можно использовать не только реляционные базы данных, но и системы классов хранения, которые поддерживают семантику, подобную «ВСТАВЬТЕ, ЕСЛИ НЕ СУЩЕСТВУЕТ», для реализации идемпотентности. Например, вы можете использовать команду Redis SETNX для замены единственной в базе данных Ограничения для достижения идемпотентного потребления.
(2) Установите предварительные условия для обновленных данных
Установите предварительное условие для изменения данных.Если условие выполнено, обновите данные, в противном случае откажитесь от обновления данных.При обновлении данных одновременно измените данные, которые необходимо оценить в предварительном условии. Таким образом, когда эта операция выполняется повторно, поскольку данные, которые необходимо оценивать в предварительных условиях, были изменены при первом обновлении данных и предварительные условия не выполняются, операция обновления данных не будет выполняться. неоднократно.
Например, операция "увеличить баланс счета X на 100 юаней" не удовлетворяет идемпотентности. Мы можем добавить к этой операции предварительное условие: "Если текущий баланс счета X составляет 500 юаней, прибавьте баланс к 100 юаням". юаней", эта операция идемпотентна. В соответствии с использованием в очереди сообщений вы можете переносить текущий баланс в тело сообщения при отправке сообщения и судить, равен ли текущий баланс в базе данных балансу в сообщении при потреблении, и выполнять только операцию изменения если они равны.
Но что, если данные, которые мы хотим обновить, не являются числовыми или мы хотим выполнить более сложную операцию обновления? Что является предварительным условием для предварительного заключения? Более общий подход заключается в том, чтобы датьДанные добавляют атрибут номера версии, перед каждым обновлением данных,Сравните, соответствует ли номер версии текущих данных номеру версии в сообщении., если оно несовместимо, отказаться от обновления данных,+1 номер версии при обновлении данных, также могут быть достигнуты идемпотентные обновления.
(3) Запишите и проверьте работу
Если ни один из двух упомянутых выше методов реализации идемпотентности не применим к вашему сценарию, существует еще один общий и широко применимый метод реализации идемпотентности: операции записи и проверки, также известные как «механизм токена» или механизм GUID (глобально уникальный идентификатор). ", идея реализации особенно проста:Перед выполнением операции обновления данных проверьте, была ли выполнена операция обновления.这种方法适用范围最广,但是实Сложность и сложность также относительно высоки, и обычно не рекомендуется использовать.
Конкретный метод реализации заключается в том, чтобы при отправке сообщения дать каждому сообщениюУкажите глобально уникальный идентификатор, при потреблении сначала проверить, было ли потреблено сообщение по этому ID, если нет, обновить данные, а затемУстановите состояние потребления для употребления.
В распределенной системе этот метод на самом деле очень сложно реализовать. Во-первых, присвоить глобально уникальный ID каждому сообщению не так просто, способов много, но ни один из них не очень хорош, чтобы удовлетворить простотой, высокой доступностью и высокой производительностью одновременно, более или менее некоторыми жертвами необходимы. Еще более неприятным является то, что в «Проверить статус потребления, затем обновить данные и установить статус потребления»,Три операции должны действовать как набор операций, чтобы гарантировать атомарность., действительно можно добиться идемпотентности, иначе будут баги.
Например, для того же сообщения: «Глобальный ID 8, операция: добавить 100 юаней на счет ID 666», есть возможность этого:
- Время t0: Потребитель A получает сообщение, проверяет статус выполнения сообщения, обнаруживает, что сообщение не было обработано, и начинает выполнять «добавить 100 юаней на счет»;
- Time T1: Потребитель B принимает сообщение, проверяет состояние выполнения сообщения и находит, что сообщение не было обработано, потому что в данный момент потребитель A еще не успел обновить состояние выполнения сообщений.
Это приведет к тому, что счет будет ошибочно увеличен на 100 юаней дважды, что очень легко сделать в распределенной системе, и его следует воспринимать как предупреждение. Для этой задачи, конечно, мы можем реализовать ее с транзакциями или блокировками, но в распределенной системе и распределенные транзакции, и распределенные блокировки решить сложнее.
5. Используйте сообщения о транзакциях для достижения распределенных транзакций
1. Дела сообщений
На самом деле, во многих сценариях цель нашего процесса «сообщения» часто состоит в том, чтобыУведомить другую систему или модуль для обновления данных, "транзакция" в очереди сообщений, в основном решаетСообщение Производитель и Сообщение Проблемы согласованности потребителей.
Когда пользователи делают покупки в приложении для электронной коммерции, они сначала добавляют товары в корзину, затем размещают заказ на несколько товаров вместе и, наконец, оплачивают, чтобы завершить процесс покупки.
В этом процессе есть шаг, который требует использования очередей сообщений.После того, как система заказов создает заказ, она отправляет сообщение в систему корзины покупок, чтобы удалить заказанные товары из корзины. Поскольку шаг удаления заказанных товаров из корзины не является обязательным шагом в основном процессе размещения заказа и оплаты пользователем, разумнее использовать очередь сообщений для асинхронной очистки корзины.
Для системы заказа он создает порядок фактического выполнения работы двух шагов:
- Вставить данные заказа в библиотеку заказов, создать заказ;
- Отправить сообщение в очередь сообщений, содержание сообщения - это только что созданный заказ
Для системы корзины покупок:
-
Подпишитесь на соответствующую тему, получайте сообщения о создании заказа, затем очищайте корзину и удаляйте позиции заказа в корзине.
В распределенной системе любой из упомянутых выше шагов может завершиться ошибкой.Если обработка не выполняется, могут возникнуть несоответствия между данными заказа и данными корзины, например:
- Создал заказ без очистки корзины;
- Заказ не был успешно создан, но товары в корзине были удалены.
Следовательно, проблема, которую нам нужно решить, заключается в следующем: в случае, если какой-либо из вышеперечисленных шагов может завершиться неудачно, мы также должны обеспечить согласованность данных библиотеки заказов и гаража.
2. Распределенные транзакции
Распределенные транзакции предназначены для реализации транзакций в распределенной системе. В распределенной системе необходимо обеспечить доступность и несерьезную жертвенную производительность.Непротиворечивость данных уже очень сложна, очевидно, реализация строго распределенных транзакций — еще более невыполнимая задача. Поэтому распределенные транзакции, о которых сейчас все говорят, в большинстве случаев находятся вНеполная реализация транзакций в распределенных системах, В разных сценариях приложений существуют разные реализации, и цель состоит в том, чтобы решить практические проблемы с помощью некоторых компромиссов.
Обычные распределенные реализации транзакций:
- 2PC (двухфазная фиксация, также известная как двухфазная фиксация)
- TCC (попробовать-подтвердить-отменить)
- сообщение о транзакции
Каждая реализация имеет свои собственные сценарии использования и свои проблемы, и ни одна из них не является идеальным решением.
Сценарии для сообщений о транзакцияхв основном те, кому нужноДанные асинхронного обновления,иТребования к данным в реальном времени не слишком высокиместо действия. Например, после создания заказа, если есть несколько секунд, товары в корзине несвоевременны, и это не является полностью неприемлемым, если окончательные данные корзины и данные заказа совпадают.
В-третьих, очередь сообщений реализует распределенные транзакции.
Сообщения транзакций требуют реализации соответствующих функций, предоставляемых очередью сообщений.И kafka, и RocketMQ предоставляют функции, связанные с транзакциями.
Для системы заказов:
- Сначала система заказов запускает транзакцию в очереди сообщений.
- Затем система заказов отправляет "половину сообщения" на сервер сообщений. Это половинное сообщение не означает, что содержание сообщения неполное, но содержание, которое оно содержит, является полным содержанием сообщения. Единственная разница между половинным сообщением и нормальное сообщение заключается в том, что до совершения транзакции для потребления. Для пользователя сообщение невидимо.
- После успешной отправки полусообщения система заказов может выполнить локальную транзакцию, создать запись заказа в библиотеке заказов и отправить транзакцию базы данных библиотеки заказов.
- Затем принимается решение зафиксировать или откатить сообщение транзакции в соответствии с результатом выполнения локальной транзакции. Если заказ успешно создан, отправляется сообщение о транзакции, и система корзины покупок может использовать это сообщение и продолжить последующий процесс. Если создать заказ не удается, сообщение о транзакции откатывается, и система корзины покупок не получает это сообщение. Таким образом, в основном достигается требование согласованности «либо все успешно, либо все терпят неудачу».
Для системы корзины:
-
Для операции очистки корзины для покупок, когда система корзины для покупок получает сообщение об успешном создании заказа, обработка отказа относительно проста.Если корзина успешно очищена, вы можете отправить подтверждение о потреблении., если это не удается,Поскольку подтверждение потребления не было отправлено, очередь сообщений автоматически повторит.
Что если произойдет сбой на четвертом этапе фиксации сообщения о транзакции? Kafka и RocketMQ дают 2 разных решения:
1. Решение Кафки:
Создайте исключение напрямую и позвольте пользователю обрабатывать его самостоятельно. Мы можем неоднократно повторять отправку в бизнес-коде, пока отправка не будет успешной, или удалить ранее созданный заказ для компенсации.
2. Решение RocketMQ:
В реализации транзакции в RocketMQ добавленоМеханизм проверки транзакцийЧтобы решить проблему сбоя фиксации сообщения транзакции. Если Producer является системой заказов, и при отправке или откате сообщений о транзакциях возникает сетевое исключение, Broker RocketMQ не получает запрос на отправку или откат, и Broker будет регулярно обращаться к Producer.Обратная проверка статуса локальной транзакции, соответствующей этой транзакции, а затем решить зафиксировать или откатить транзакцию в соответствии с результатом обратной проверки. Чтобы поддерживать этот механизм антипроверки транзакций, наш бизнес-код долженРеализует интерфейс для обратной проверки состояния локальных транзакций., который сообщает RocketMQ, была ли локальная транзакция успешной или неудачной.
Объединив реализацию общего сообщения транзакции, упомянутого выше, и механизм обратной проверки транзакции RocketMQ, используйтеФункция сообщений транзакций RocketMQ реализует процесс распределенных транзакций.Как показано ниже:
6. Проблемы с порядком в очередях сообщений
Когда мы говорим порядок, о чем мы говорим?
В повседневном мышлении большая часть последовательности связана со временем, то есть временная последовательность представляет собой последовательность отношений событий.
Например, событие A происходит в 15:00, а событие B — в 16:00, тогда мы думаем, что событие A происходит до события B, и их отношение порядка — сначала A, а затем B.
Приведенный выше пример работает, потому что у них одинаковая система отсчета, то есть их время соответствует времени одних и тех же физических часов. Если время, когда происходит А, — это пекинское время, а время, от которого зависит В, — токийское время, то сохраняется ли порядок сначала А, а затем В?
Без абсолютной ссылки на время, так что между A и B, а также заказать его, или как определить порядок a и b?
Очевидно, что если между событиями А и В существует причинно-следственная связь, то А должно произойти раньше В (причина и следствие, есть причина и следствие). Наоборот, в отсутствие абсолютной привязки ко времени, если нет причинно-следственной связи между А и В, то нет и последовательной связи между А и В.
Итак, когда мы говорим о порядке, мы на самом деле имеем в виду следующее:
- В случае абсолютной привязки ко времени отношение между временем возникновения события;
- и без привязки ко времени - отношение, выведенное из причинно-следственной связи до того, как произошло раньше;
Обсудить порядок в распределенной среде
При обсуждении последовательности в распределенной среде (многопоточность и многопроцессорность можно рассматривать как распределенную среду):
-
Порядок событий на той же ните является детерминированным, и их можно считать одинаковым времени в качестве ссылки
-
Порядок между различными потоками может быть выведен только по причинно-следственной связи.
(точки обозначают события, волнистые стрелки обозначают сообщения между событиями)
На приведенном выше рисунке последовательность событий в процессе P такова: p1->p2->p3->p4 (вывод времени). А поскольку p1 отправил сообщение q2 процесса Q, то p1 должен быть до q2 (каузальный вывод). Но отношение порядка между p1 и q1 определить невозможно.
Рекомендуется прочитать "Время, часы и порядок событий в распределенной системе", в которой подробно анализируются проблемы порядка в распределенных системах.
Последовательные сообщения в промежуточном программном обеспечении сообщений
Что такое последовательное сообщение
С вышеуказанной основой после возвращения к предмету этой статьи порядок сообщений чата в сообщении промежуточного программного обеспечения.
Последовательное сообщение (сообщение FIFO) — это тип сообщения, предоставляемый MQ, который публикуется и используется в строгом порядке. Последовательное сообщение состоит из двух частей: последовательной публикации и последовательного потребления.
Существует два типа последовательных сообщений:
Порядок разделов: все сообщения в разделе публикуются и используются в порядке поступления и отправки.
Глобальный порядок: все сообщения в теме публикуются и потребляются в порядке поступления.
Это определение последовательных сообщений в Alibaba Cloud, которое делит последовательные сообщения на последовательную публикацию и последовательное потребление. Считается ли отправка сообщений в нескольких потоках последовательной публикацией?
Как объяснялось в предыдущем разделе, в многопоточности нет порядка без причинно-следственной связи. Тогда пользователь, отправляющий сообщения в несколько потоков, означает, что пользователю не важен порядок сообщений, отправляемых в разных потоках. То есть сообщения, отправленные несколькими потоками, сообщения между разными потоками не публикуются последовательно, а сообщения из одного потока публикуются последовательно. Это должно гарантироваться самим пользователем.
Для последовательного потребления вам необходимо убедиться, что сообщения из одного и того же отправляющего потока обрабатываются в одном и том же порядке при потреблении (почему бы не сказать, что они должны потребляться в потоке?).
Глобальный порядок на самом деле является особым случаем порядка раздела, даже если тема имеет только один раздел (глобальный порядок не обсуждается ниже, потому что глобальный заказ столкнется с проблемами с производительностью, а большинство сценариев не нуждаются в глобальном порядке).
Как гарантировать заказ
В модели MQ заказ должен быть гарантирован в 3 этапа:
- Когда сообщения передаются последовательно
- Сообщения хранятся в том же порядке, в котором они были отправлены
- Сообщения потребляются в том же порядке, в котором они сохраняются
Соблюдение порядка при отправке означает, что для сообщений, требующих порядка, пользователь должен отправлять их синхронно в одном потоке. Сохранение согласованного порядка хранения и передачи требует, чтобы сообщения A и B, отправляемые в одном и том же потоке, хранились перед B в пространстве. Согласованность между потреблением и хранением требует, чтобы сообщения A и B обрабатывались в порядке A и B после того, как они поступят к Потребителю.
Как показано ниже:
Необработанные данные для сообщений двух порядков: a1, b1, b2, a2, a3, b3 (порядок появления в абсолютном времени):
-
При отправке сообщения порядка а должны сохранять порядок а1, а2 и а3, а сообщения порядка b тоже одинаковы, но между сообщениями порядка а и b нет отношения порядка, а это значит, что сообщения порядка a и b могут быть разными, отправляются в потоке
-
При хранении порядка сообщений порядков A и B необходимо гарантировать соответственно, но порядок сообщений между заказами A и B не может быть гарантирован
-
- a1, b1, b2, a2, a3, b3 допустимы
- a1, a2, b1, b2, a3, b3 также допустимы
- a1, a3, b1, b2, a2, b3 не принимаются
-
Простой способ гарантировать порядок при потреблении — «ничего не делать» и не корректировать порядок полученных сообщений, то есть до тех пор, пока сообщения одного раздела обрабатываются только одним потоком; конечно, если a и b находятся в разделе. После получения сообщений вы также можете разделить их на разные потоки для обработки, но вы должны взвесить преимущества
Реализация последовательности в RocketMQ с открытым исходным кодом
На приведенном выше рисунке представлен принцип последовательного сообщения RocketMQ, и сообщения разного порядка направляются в разные разделы. Документ дает только порядок proDuecer, а потребительское потребление может иметь раздел только для обеспечения порядка сообщений, и реализация выглядит следующим образом.
Сторона производителя
Единственное, что нужно сделать производителю, чтобы обеспечить порядок сообщений, — это направить сообщения в определенные разделы.В RocketMQ выбор разделов осуществляется с помощью MessageQueueSelector.
- Список mqs: все разделы в теме, в которую должно быть отправлено сообщение
- Сообщение MSG: Объект сообщений
- Дополнительные параметры: пользователи могут передавать свои собственные параметры
Например, следующая реализация может гарантировать, что сообщения одного порядка направляются в один и тот же раздел:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Потребительская сторона
Существует два типа потребителей RocketMQ: MQPullConsumer и MQPushConsumer.
MQPullConsumer управляется пользовательским потоком и активно получает сообщения с сервера, и каждый раз получает сообщение в MessageQueue. Список msgFoundList в PullResult, естественно, совпадает с порядком хранения, пользователь должен обеспечить порядок потребления после получения пакета сообщений.
Для PushConsumer пользователь регистрирует MessageListener для приема сообщений, и клиент должен обеспечить порядок сообщений при вызове MessageListener. Реализация в RocketMQ выглядит следующим образом:
- Однопоточный PullMessageService для получения сообщений от брокера
- PullMessageService добавляет сообщения в ProcessQueue (ProcessMessage — это кеш сообщений), а затем отправляет задачу потребления в ConsumeMessageOrderService.
- ConsumeMessageOrderService выполняется в нескольких потоках, и каждый поток должен получить блокировку MessageQueue при использовании сообщений.
- Получить сообщение от ProcessQueue после получения блокировки
Основная идея обеспечения порядка потребления:
- После получения сообщения добавьте его в ProcessQueue и выполните в одном потоке, чтобы сообщения в ProcessQueue были последовательными.
- Отправленная задача потребления — «использовать определенный MQ». Этот запрос потребления предназначен для получения потребления сообщений из ProcessQueue, поэтому он также является последовательным (независимо от того, какой поток получает блокировку, он находится в порядке сообщений в ProcessQueue. потреблять)
Связь между порядком и исключением
Последовательные сообщения требуют, чтобы и Производитель, и Потребитель гарантировали порядок. Производитель должен убедиться, что сообщение направляется в правильный раздел, а сообщение должно гарантировать, что данные каждого раздела имеют только одно сообщение потока, поэтому будут некоторые дефекты:
- Сообщение о порядке передачи при отказе не может использовать характеристики кластера, поскольку оно не может заменить повторную попытку MessageQueue.
- Из-за проблемы с точкой доступа, вызванной отправленной стратегией маршрутизации, объем данных некоторых MessageQueues может быть особенно большим.
- Потребление параллельных чтений зависит от количества разделов
- Невозможно пропустить при сбое потребления
Если MessageQueue нельзя заменить и повторить попытку, MessageQueue должна иметь свою собственную копию, а доступная копия гарантируется с помощью таких алгоритмов, как Raft и Paxos, или MessageQueue хранится на других высокодоступных устройствах хранения.
Кажется, что нет хорошего решения проблемы хотспота Мы можем только распределять сообщения по разным MessageQueue как можно более равномерно, разделяя MessageQueue и оптимизируя метод маршрутизации.
Параллелизм потребления теоретически не является большой проблемой, потому что количество MessageQueues можно регулировать.
Неизбежно, что потребление не может быть пропущено, потому что пропуск может привести к неправильной последующей обработке данных. Однако могут быть предусмотрены некоторые стратегии, и пользователь может решить, следует ли пропустить его в зависимости от типа ошибки, и предоставить такие функции, как очередь повторных попыток, после пропуска пользователь может повторно использовать сообщение в «других» местах.
Благодарность
Благодаря Geek Time "Мастер-класс по очереди сообщений》Ссылка на сайт
Наконец
Эта статья представляет собой большой сборник, и в середине должна быть ссылка на содержание или изображение многих других людей, но из-за времени нет записи в то время, и приносим свои извинения за это, если автор обнаружит свою собственную статью или изображение, Могу поговорить наедине, дополню.
Если вы обнаружите, что пишете неплохо, можете поискать в паблике №».это Кервин", совершенствуйтесь вместе!
также посмотретьДомашняя страница Кервина на GitHub