В последней статье мы узнали о механизме повторных попыток сообщения в RocketMQ и о том, как обрабатывать сообщения повторных попыток как на стороне производителя, так и на стороне потребителя.
RocketMQ отправит сообщение потребителю в соответствии с определенными правилами, когда сообщение используется для повторной отправки сообщения. Это включает в себя концепцию идемпотентности сообщений.
Во-первых, давайте разберемся, что такое идемпотентность и что такое идемпотентность сообщений.
что такое идемпотент
Baidu объясняет «идемпотентность» следующим образом.
Пусть f — унарная операция, отображающая из X в X, тогда f идемпотентна, когда для всех x в X
f(f(x)) = f(x).
В частности, тождественная функция должна быть идемпотентной, и любая константная функция также идемпотентна.
Ключ здесьf(f(x)) = f(x), Переведенный в популярное объяснение:
Операция называется идемпотентной, если она выполняется несколько раз и имеет тот же эффект, что и одна операция.
Об идемпотентности сообщений
Основываясь на приведенных выше концепциях в сочетании со сценарием потребления сообщений, мы можем легко обобщить концепцию идемпотентности сообщений:
который:
Если сообщение повторяется несколько раз, сторона потребителя использует повторяющееся сообщение несколько раз, и результат такой же, как при однократном потреблении, и многократное потребление не оказывает побочного эффекта на систему, тогда мы называем это сообщение процесса идемпотентным.
Например:
В сценарии оплаты потребитель получает сообщение о вычете и выполняет операцию вычета для заказа, и операция вычета должна вычесть 10 юаней.
Операция заряда повторяется много раз с тем же эффектом, выполненным один раз, только после того, как реальный заряд, дебетовый заказ записи пользователя для ручки должен быть только один заряд воды. Вычета больше нет. Поэтому мы говорим, что дебетовые операции соответствуют требованиям этого процесса потребления новостей идемпотентно.
Сценарии, в которых требуется идемпотентность сообщений
Во-первых, давайте рассмотрим сценарии, требующие идемпотентности сообщений, то есть сценарий дублирования сообщений, упомянутый в предыдущей статье.
- Повторяйте при отправке:
Когда производитель отправляет сообщение, сообщение успешно доставляется брокеру, но в это время происходит сбой сети или производитель отключается, в результате чего брокер не может отправить ACK. В это время производитель считает, что отправка сообщения не удалась, поскольку ему не удалось получить ответ на отправку сообщения, поэтому он пытается повторно отправить сообщение брокеру. Когда сообщение будет отправлено успешно, в брокере будет два сообщения с одинаковым содержимым, и, наконец, потребитель получит два сообщения с одинаковым содержимым и одинаковым идентификатором сообщения. Отсюда и дублирование сообщений.
- Повторяйте при употреблении:
Повторное потребление также произойдет, когда потребительские сообщения. Когда потребитель имеет дело с бизнес-завершением потребительского статуса, из-за ненормальности сети Flash, состояние CONDUME_SUCCESS, которое не имеет завершению потребителя, не устанавливается на брокер. Чтобы гарантировать, что сообщение потребляется по меньшей мере одной семантикой, брокер затем доставит сообщение, обрабатываемое после восстановления сетевого среды, и, наконец, приведет к тому, что потребители получают контент несколько раз, и идентификатор сообщения является тем же сообщением, что привело к повторению из сообщения.
Можно видеть, что независимо от того, повторяется ли это при отправке или при потреблении, конечный эффект заключается в том, что потребитель получает повторяющиеся сообщения при потреблении, тогда мы знаем, что идемпотентность сообщения может быть достигнута только путем выполнения идемпотентной обработки на стороне потребителя.
Реализовать идемпотентность сообщений
Итак, как мы можем добиться идемпотентности сообщений?
Сначала нам нужно определить два элемента идемпотентности сообщений:
- идемпотентный токен
- Обеспечение уникальности
Мы должны обеспечить уникальность результатов бизнес-обработки при наличии идемпотентных токенов, прежде чем считать идемпотентную реализацию успешной.
Далее эти два элемента объясняются отдельно.
идемпотентный токен
Идемпотентный токен — это установленное соглашение между производителем и потребителем. В бизнесе это обычно строка с уникальным бизнес-идентификатором, таким как номер заказа, серийный номер и т. д. И вообще генерируется производителем и передается потребителю.
Обеспечение уникальности
То есть сервер должен принять определенную стратегию, чтобы гарантировать, что одна и та же бизнес-логика не будет успешно выполняться много раз. Например: используя Alipay для оплаты, покупка продукта несколько раз будет успешной.
Более распространенным способом является использование дедупликации кэша и реализация идемпотентности путем добавления уникального индекса базы данных к бизнес-идентификатору.
Конкретная идея такова: например, в платежном сценарии инициатор платежа генерирует серийный номер платежа, и после того, как сервер успешно обработает платежный запрос, сохранение данных будет успешным. Поскольку к потоку платежей в таблице добавляется уникальный индекс, при повторении платежа будет выдано сообщение об ошибке из-за наличия уникального индекса.duplicate entryЛогика службы сервера зафиксирует исключение и возвращает «повторный платеж». Это не повторит вычет.
На основе приведенного выше сценария мы также можем ввести компоненты кеша, такие как Redis, для обеспечения дедупликации: когда запрос платежа попадает на сервер, сначала перейдите к кешу, чтобы принять решение, и получите сохраненное значение в соответствии с ключом = " серийный номер платежа", если возврат имеет значение Empty, это указывает на то, что платежная операция выполняется в первый раз и в качестве ключа используется текущий серийный номер платежа, а значение может быть любой строкой, хранящейся в redis через set(key , значение, expireTime).
При поступлении повторного платежного запроса попробуйте выполнить операцию get (серийный номер платежа), которая попадет в кеш, поэтому мы можем считать запрос повторным платежным запросом, а серверная служба вернет служебное приглашение повторного платежа. запрашивающему.
Поскольку мы обычно устанавливаем время истечения во время использования кеша, кеш может стать недействительным и вызвать проникновение запросов в постоянное хранилище (например, MySQL). Таким образом, нельзя отказаться от использования уникальных индексов из-за введения кеша, и лучше объединить их.
Как обрабатывать идемпотентность сообщений в сценариях RocketMQ
Разобравшись с двумя элементами и типичными случаями, мы возвращаемся к сценарию потребления сообщений.
В качестве высокопроизводительного промежуточного программного обеспечения для сообщений RocketMQ может гарантировать, что сообщения не будут потеряны, но не будут повторяться. На самом деле можно реализовать дедупликацию сообщений в RocketMQ, но, учитывая требования высокой доступности и высокой производительности, если дедупликация сообщений на стороне сервера выполняется, RocketMQ необходимо выполнять дополнительные операции повторного хеширования, сортировки и другие операции над сообщением. , что будет стоить Чем больше стоимость ресурсов, таких как время и пространство, преимущества не очевидны. RocketMQ считает, что вероятность дублирования сообщений при нормальных обстоятельствах на самом деле очень мала, поэтому RocketMQ передает идемпотентную операцию сообщения бизнес-стороне для обработки.
На самом деле суть вышеуказанной проблемы в том, что сам сетевой вызов имеет неопределенность, то есть третье состояние ни успеха, ни отказа, так называемоеОбработкасостояние, поэтому будут дубликаты. С этой проблемой также сталкиваются многие другие продукты MQ.Обычный метод заключается в том, чтобы требовать от потребителей дедупликации при потреблении сообщений, что в этой статье мы называем идемпотентностью потребления.
Читатели, имеющие некоторый опыт работы с RocketMQ, могут заметить, что каждое сообщение имеет MessageID, поэтому можем ли мы использовать этот идентификатор в качестве основы для дедупликации, то есть упомянутого выше идемпотентного токена?
Ответ — нет, поскольку MessageID может конфликтовать, поэтому не рекомендуется использовать MessageID в качестве основы для обработки, но следует использовать уникальные бизнес-идентификаторы, такие как номер заказа, серийный номер и т. д., в качестве ключевой основы для идемпотентной обработки.
Как было сказано выше, идемпотентный базис должен генерироваться производителем сообщения, при отправке сообщения мы можем установить id через ключ сообщения, и соответствующий APIorg.apache.rocketmq.common.message.setKeys(String keys)код показывает, как показано ниже:
Message sendMessage = new Message(
MessageProtocolConst.WALLET_PAY_TOPIC.getTopic(),
message.getBytes());
sendMessage.setKeys("OD0000000001");
Когда потребитель сообщения получает сообщение, он выполняет идемпотентную обработку в соответствии с ключом сообщения.org.apache.rocketmq.common.message.getKeys()код показывает, как показано ниже:
(msgs, context) -> {
try {
// 默认msgs只有一条消息
for (MessageExt msg : msgs) {
String key = msg.getKeys();
return walletCharge(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
LOGGER.error("钱包扣款消费异常,e={}", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
Потребитель может прочитать идемпотентный базис, установленный производителем (например, номер заказа и т. д.) с помощью getKeys(), а затем бизнес-логика может выполнить идемпотентную обработку идентификатора.
Если вы чувствуете, что вам нужно каждый раз устанавливать ключ на стороне производителя и получать ключ на стороне потребителя, это немного громоздко. Идемпотентная основа также может быть установлена в протоколе сообщений, и потребитель также может анализировать идентификатор для выполнения идемпотентных операций после получения сообщения. Необходимо только, чтобы производитель и потребитель сообщения согласовали протокол о том, как анализировать идентификатор.
Конкретная идемпотентная логика зависит от используемого сценария, и здесь я пытаюсь сделать некоторые выводы из своего опыта.
Общие идемпотентные операции на стороне потребителя
- Запрос состояния перед бизнес-операцией
Когда потребитель начинает выполнять бизнес-операцию, он сначала запрашивает бизнес-статус через идемпотентный идентификатор, например: изменение ссылки на статус заказа, когда статус заказа — успех/неудача, дальнейшая обработка не требуется. Затем нам нужно только запросить статус заказа по номеру заказа перед выполнением логики потребления и отправить сообщение после получения подтвержденного статуса заказа и уведомить брокера о том, что статус сообщения:ConsumeConcurrentlyStatus.CONSUME_SUCCESS.
- Извлечение данных перед бизнес-операциями
Логика аналогична первому пункту, то есть извлечение данных выполняется перед потреблением, если соответствующие данные можно запросить по уникальному бизнес-идентификатору, то последующая бизнес-логика не требуется. Например, в процессе заказа, прежде чем потребитель выполнит асинхронный заказ, он сначала проверяет, существует ли заказ по номеру заказа.Здесь может быть проверена база данных или может быть проверен кеш. Если он существует, он напрямую вернет успех потребления, в противном случае будет выполнена операция заказа.
- Ограничения уникальности гарантируют последнюю линию защиты
上述第二点操作并不能保证一定不出现重复的数据,如:并发插入的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插入操作,因此我们务必要对幂等id添加唯一性索引,这样就能够保证在并发场景下也能保证数据的唯一性。
- Познакомить с замковым механизмом
上述的第一点中,如果是并发更新的情况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
高并发下,建议通过状态机的方式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提高因此不建议使用。
- журнал сообщений
这种方案和业务层做的幂等操作类似,由于我们的消息id是唯一的,可以借助该id进行消息的去重操作,间接实现消费的幂等。
首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意一定要 **与业务操作处于同一个事物** 中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可。
.....
Определенно есть еще сценарии, которые я не рассмотрел.Упомянутые здесь операции все связаны друг с другом, и их совместное использование может обеспечить идемпотентность потребительского бизнеса.
В любом случае, помните об одном принципе:Кэши ненадежны, запросы ненадежны.
В сценариях с высокой степенью параллелизма уникальный индекс постоянного хранилища и введение механизма блокировки должны использоваться в качестве последней линии защиты для совместного обеспечения точности и целостности данных!
Суммировать
В этой статье в основном объясняется, что такое идемпотентность и как передать уникальный идемпотентный идентификатор в сценариях использования сообщений, а также дополнительно анализируется идея обеспечения идемпотентности сообщений и обобщаются распространенные методы идемпотентной обработки сообщений.
Рутина меняется, ключ в том, чтобы понять идеи и методы, наш принципНезависимо от того, сколько раз он выполняется, поведение, демонстрируемое бизнесом, является однородным., В соответствии с этой предпосылкой мы ввели несколько стратегий защиты от повторного воспроизведения, таких как проверка базы данных перед операцией, проверка кэша перед операцией, механизм оптимистичной блокировки/распределенной блокировки, добавление уникального индекса и т. д. Благодаря комплексному эффекту этих стратегий идемпотентность сообщения наконец достигнута цель.
Наконец, есть предложение, которым можно поделиться, нет никакой магии, о которой нужно просить. Нет конца искусству. Я верю, что вы будете достаточно умны, чтобы интегрировать различные технические средства на пути технологий в сочетании с реальными сценариями, чтобы идти все дальше и дальше.