В распределенных системах для обеспечения согласованности данных необходимо использовать распределенные транзакции. Существует множество способов реализации распределенных транзакций.Сегодня я в основном расскажу об использовании сообщений о транзакциях RocketMQ для реализации распределенных транзакций.
В конце статьи есть пасхалки, прочтите перед уходом
Зачем вам транзакционные сообщения?
Многие студенты могут не знать, что такое сообщение о транзакции. Это не имеет значения. Давайте возьмем реальный бизнес-сценарий и дадим вам сначала понять проблемы с обычными сообщениями.
В приведенном выше бизнес-сценарии, когда пользователь успешно платит, платежное поручение будет обновлено, а затем будет отправлено сообщение MQ. Система комиссий извлечет сообщение, рассчитает комиссию и сохранит ее в другой базе данных комиссий.
Поскольку этот этап расчета сборов можно рассчитать в автономном режиме, здесь используется MQ, чтобы разделить процесс оплаты и расчет сборов.
Процесс в основном включает три этапа:
- Обновить данные заказа
- Отправить сообщение в MQ
- Плата за обработку сообщений
Любой из шагов, упомянутых выше, завершится ошибкой, если мы не обработаем его, данные с обеих сторон будут несогласованными, что приведет к следующим двум ситуациям:
- Данные заказа были обновлены, но данные комиссии не были созданы
- Данные комиссии генерируются, но данные заказа не обновляются
Это касается реальных денег, и как только расчет будет недооценен, это вызоветПотеря активов, действительно не может себе это позволить!
Что касается последнего шага, это относительно просто. Если потребление сообщения не удается, пока не отправлено подтверждение сообщения, сервер MQ автоматически повторит попытку.
большая проблемаПричина в том, что мы не можем гарантировать, что операция обновления согласуется с отправкой сообщения. Обновляем ли мы сначала данные заказа, а затем отправляем сообщение, или сначала отправляем сообщение, а затем обновляем данные заказа, существует вероятность успеха и неудачи.
Как показано ниже, сначала отправляется сообщение, а затем база данных обновляется.
После успешной отправки вышеупомянутого сообщения о процессе локальная транзакция отправляется. Этот поток выглядит идеальным, но представьте, если при фиксации транзакции произойдет сбой выполнения базы данных, что приведет к откату транзакции.
Однако в этот момент сообщение было отправлено и не может быть отозвано. Это приводит к тому, что система комиссий немедленно обрабатывает сообщение, рассчитывает комиссию и обновляет ее в базе данных. Это приводит к несогласованности, создаваемой системой комиссий, когда платежные данные не обновляются.
Так что, если мы повернем процесс вспять, не будет ли лучше?
Мы используем следующее представление псевдокода:
// 开始事务
try {
// 1.执行数据库操作
// 2.提交事务
}catch (Exception e){
// 3.回滚事务
}
// 4.发送 mq 消息
Здесь, если транзакция успешно отправлена, но сообщение mq не может быть отправлено, это приведет к несоответствию, при котором данные платежа обновляются, но данные комиссии не генерируются.
Некоторые студенты здесь могут подумать, что перемещение шага отправки сообщения mq в транзакцию, отправка сообщения завершается ошибкой, и транзакция откатывается, разве это не идеально?
Псевдокод выглядит следующим образом:
// 开始事务
try {
// 1.执行数据库操作
// 2.发送 mq 消息
// 3.提交事务
}catch (Exception e){
// 4.回滚事务
}
Приведенный выше код не вызывает никаких проблем, отправка сообщения завершается ошибкой, и транзакция откатывается.
Но на самом деле на втором шаге возможно, что сообщение было отправлено на сервер MQ, но ответное сообщение от MQ не было получено вовремя из-за проблем с сетью, из-за чего отправитель сообщения думает, что сообщение сообщение не удалось отправить.
Это приведет к откату транзакции заказа, но система комиссий за транзакцию может потреблять сообщения, а базы данных с обеих сторон несовместимы.
Учащиеся, знакомые с MQ, могут подумать, что если отправка сообщения не удалась, можно повторить попытку.
Да, мы можем увеличить количество повторных попыток и повторно отправить сообщение. Но здесь мы должны обратить внимание, что поскольку отправка сообщения связана с транзакцией, слишком много повторных попыток продлит время выполнения транзакции базы данных, а время обработки транзакции слишком велико, что приведет к более длительному времени удержания транзакции. заблокировать транзакцию, что повлияет на общую пропускную способность базы данных.
В реальном бизнесе не рекомендуется совмещать отправку сообщений в транзакциях базы данных.
сообщение о транзакции
Транзакционные сообщения предоставляются RocketMQ.делаФункция может реализовывать распределенные транзакции, чтобы гарантировать, что вышеуказанные операции транзакций и отправка сообщений будут успешными или неудачными.
При использовании сообщений о транзакциях общий поток выглядит следующим образом:
Сначала отправим половинку(half) сообщение MQ, уведомляющее его о начале транзакции. Здесь полусообщение не означает, что содержание сообщения является неполным, фактически оно содержит все полное содержание сообщения.
Единственная разница между этим полусообщением и обычным сообщением заключается в том, что до того, как транзакция будет зафиксирована, это сообщениеНевидимыйДа, потребитель не будет потреблять сообщение.
После успешной отправки полусообщения мы можем выполнить транзакцию базы данных. Затем решите зафиксировать или откатить сообщение транзакции в соответствии с результатом выполнения транзакции.
Если транзакция успешно отправлена, в MQ будет отправлено подтверждающее сообщение, и система комиссий за транзакцию сможет успешно обработать это сообщение.
Если транзакция откатывается, в MQ будет отправлено уведомление об откате, которое затем удалит сообщение. Для платной системы никто не знает о существовании этого сообщения.
Это соответствует требованию согласованности по принципу «все или ничего».
Есть еще проблемы с фактическим процессом, если мыфиксация/откатЧто делать, если сообщение о транзакции не удается?
Для этой проблемы RocketMQ даетПроверка транзакцииМеханизмы. Нам нужно зарегистрировать интерфейс обратного вызова, чтобы проверить статус локальной транзакции.
Если RocketMQ не получает запрос на фиксацию или откат, он будет периодически проверять интерфейс обратного вызова, а затем решать, откатывать или фиксировать транзакцию в соответствии с результатом проверки.
Общий процесс сообщения транзакции RocketMQ выглядит следующим образом:
Пример кода сообщения о транзакции выглядит следующим образом:
public class TransactionMQProducerExample {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
// 不定义将会使用默认的
ExecutorService executorService =
new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 改成自己的地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Order order = new Order("66666", "books");
Message msg =
new Message("transaction_tp",
JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送半消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(sendResult.getSendStatus());
producer.shutdown();
}
public static class TransactionListenerImpl implements TransactionListener {
/**
* 半消息发送成功将会自动执行该逻辑
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
Order order = null;
try {
order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = updateOrder(order);
if (isSuccess) {
// 本地事务执行成功,提交半消息
System.out.println("本地事务执行成功,提交事务事务消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行成功,回滚半消息
System.out.println("本地事务执行失败,回滚事务消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("本地事务执行异常");
}
// 异常情况返回未知状态
return LocalTransactionState.UNKNOW;
}
/**
* 更新订单
* 这里模拟数据库更新,返回事务执行成功
*
* @param order
* @return
*/
private boolean updateOrder(Order order) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
/***
* 若提交/回滚事务消息失败,rocketmq 自动反查事务状态
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
Order order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = queryOrder(order.getOrderId());
if (isSuccess) {
// 本地事务执行成功,提交半消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行成功,回滚半消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("查询失败");
}
// 异常情况返回未知状态
return LocalTransactionState.UNKNOW;
}
/**
* 查询订单状态
* 模拟返回查询成功
*
* @param orderId
* @return
*/
private boolean queryOrder(String orderId) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
}
@Data
public static class Order {
private String orderId;
private String goods;
public Order(String orderId, String goods) {
this.orderId = orderId;
this.goods = goods;
}
}
}
В приведенном выше коде:
- Нам нужно указать производителя дляТолькоиз
ProducerGroup
- нужно унаследовать
TransactionListener
Интерфейс обратного вызова аннотации, гдеexecuteLocalTransaction
методы выполняют локальные транзакции,checkLocalTranscation
Используется для проверки локальных транзакций. - Существует три типа возвращаемого статуса транзакции:
- Промежуточное состояние LocalTransactionState.UNKNOW, RocketMQ проверит ответ
- LocalTransactionState.COMMIT_MESSAGE фиксирует транзакцию, и сообщение использует это сообщение позже.
- LocalTransactionState.ROLLBACK_MESSAGE, откатить транзакцию, RocketMQ удалит сообщение
Примечания по использованию сообщений о транзакциях
Максимальное количество обратных проверок сообщений о транзакциях
Из-за чрезмерного количества обратных проверок для одного сообщения очередь полусообщений будет накапливаться, что повлияет на производительность. RocketMQ по умолчанию ограничивает количество проверок одного сообщения до 15.
Мы можем изменитьbroker
Файл конфигурации, добавьте следующую конфигурацию:
# N 为最大检查次数
transactionCheckMax=N
После проверки числа превышает максимальное количество, RocketMQ откажется от сообщения и распечатать журнал ошибок.
Откажитесь от сообщения, которое вы хотите настроить, вам нужно изменить код стороны брокера RocketMQ, наследованиеAbstractTransactionalMessageCheckListener
переписатьresolveDiscardMsg
метод, добавляя пользовательскую логику.
Синхронизированный механизм двойной записи
Чтобы гарантировать, что сообщения о транзакциях не будут потеряны и обеспечить целостность транзакций, сообщения о транзакциях необходимо реплицировать на другие узлы в кластере.Рекомендуется использовать механизм синхронной двойной записи.
Настройка времени обратной проверки транзакции
Мы можем установить следующие параметры, чтобы указать, через какое время сервер MQ начнет проверять сообщение о транзакции (вычисляется после успешного сохранения сообщения о транзакции).
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");
или мы можемbroker.conf
Установите следующие параметры:
# 单位为 ms,默认为 6 s
transactionTimeout=60000
Отправитель активно устанавливает приоритет параметра конфигурации выше, чемbroker
конечная конфигурация.
Кроме того, RocketMQ также имеет конфигурацию для управления интервалом проверки транзакционных сообщений:
## 默认为 60s
transactionCheckInterval=5000
Если пользовательская конфигурация такая, как указано выше, интервал проверки сообщений о транзакциях составляет 5 секунд, а время проверки сообщений о транзакциях устанавливается равным 60 секундам.
Это означает, что брокер проверяет сообщение о транзакции каждые 5 с. Если время от сообщения о транзакции до сервера MQ в это время не превышает 60 с, он не будет возвращаться, пока время не превысит 60 с.
пасхальные яйца
При поиске данных сообщения о транзакции я обнаружил, что в документации RocketMQ есть связанные ошибки.
Адрес документа:GitHub.com/9526Xu/рок…
Два вышеперечисленных на самом делеОшибка, следует изменить на:AbstractTransactionalMessageCheckListener
иtransactionTimeout
.
Адрес выпуска:GitHub.com/Apache/рок…
Я изменил его и отправилPR. Ха-ха, также внес свой вклад в проект с открытым исходным кодом.
Reference
- GitHub.com/Apache/рок…
- GitHub.com/9526Xu/рок…
- Geek Time — Мастер-класс по очереди сообщений
Последнее слово (пожалуйста, обратите внимание)
В прошлом я всегда думал, что участие в проектах с открытым исходным кодом сложно, но только недавно я участвовал в двух последовательных ревизиях проектов с открытым исходным кодом и обнаружил, что это не так сложно, как я себе представлял. Из-за изменения версии некоторые документы проекта с открытым исходным кодом содержат ошибки, и если мы их увидим, мы сможем легко их исправить, что также является вкладом в проекты с открытым исходным кодом.
Если вам не хватает таланта и обучения, неизбежно будут ошибки.Если вы обнаружите какие-либо ошибки, пожалуйста, оставьте сообщение и укажите на это мне, и я его исправлю.
Еще раз спасибо за чтение, ячерный парень внизу, инструментальная обезьяна, которая еще не лысая, увидимся в следующей статье~
Добро пожаловать, чтобы обратить внимание на мой официальный аккаунт: программа для общения, ежедневный толчок галантерейных товаров. Если вас интересует мой рекомендуемый контент, вы также можете подписаться на мой блог:studyidea.cn