предисловие
Ранее мы говорили, что распределенные транзакции — это сложная техническая проблема. Не существует универсального решения, простых и эффективных средств.
Однако, если наша система не стремится к строгой согласованности, то наиболее часто используемой схемой является окончательная согласованность. Сегодня мы основываемся наRocketMQ
Реализовать программу новостей согласованности распределенных транзакций.
Код в этой статье — не просто демонстрация.Учитывая некоторые исключения, идемпотентное потребление и очереди недоставленных сообщений, постарайтесь приблизиться к надежным бизнес-сценариям.
Кроме того, в конце книги «RocketMQ Technology Insider» есть анализ процесса обработки ошибок примера кода распределенной транзакции, поэтому длина велика, и я надеюсь, что вы сможете терпеливо его просмотреть.
1. Деловые новости
Здесь я не хочу использовать много текста, чтобы вдаваться в подробности.RocketMQ
Принцип сообщений о транзакциях, нам нужно понять только две концепции.
- Полусообщение, полусообщение
Временно недоступенConsumer
Израсходованные сообщения.Producer
сообщение было отправлено наBroker
end, но состояние этого сообщения помечается как недоставленное, а сообщение в этом состоянии называется полусообщением. Фактически, сообщения в этом состоянии будут помещены вRMQ_SYS_TRANS_HALF_TOPIC
под тему.
когдаProducer
После того, как терминал подтвердит это дважды, т.е.Commit
после,Consumer
конец может быть потреблен; тогда, если онRollback
, сообщение будет удалено и никогда не будет использовано.
- Проверить статус транзакции
Мы думаем, что это может быть вызвано сетевыми причинами, проблемами приложений и т. д.Producer
Терминал не подтвердил это полусообщение, то в это времяBroker
Сервер будет регулярно сканировать эти полусообщения и активно находитьProducer
Клиент запрашивает статус сообщения.
Конечно, мы можем настроить, когда сканировать, в том числе сколько раз сканировать, и мы подробно поговорим об этом позже.
короче,RocketMQ
Принцип реализации сообщений о транзакциях основан на двухэтапной фиксации и просмотре состояния транзакции, чтобы определить, окончательно ли зафиксировано сообщение или отменено.
В этой статье наш код начинается с订单服务、积分服务
Например. Исходя из вышеизложенного, общий процесс выглядит следующим образом:
2. Заказать услугу
В сервисе заказов мы получаем фронтальный запрос на создание заказа и сохранение соответствующих данных в локальную базу данных.
1. Таблица журнала транзакций
В сервисе заказов, помимо таблицы заказов, также требуется таблица журнала транзакций. Он определяется следующим образом:
CREATE TABLE `transaction_log` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事务ID',
`business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Эта таблица специально используется для проверки статуса транзакции. При отправке бизнес-данных в эту таблицу также вставляется часть данных, они сосуществуют в локальной транзакции. Запросите таблицу по идентификатору транзакции. Если возвращается запись, это доказывает, что локальная транзакция была зафиксирована; если запись не возвращается, локальная транзакция может находиться в неизвестном состоянии или в состоянии отката.
2. TransactionMQProducer
Мы знаем, что черезRocketMQ
Чтобы отправить сообщение, вам нужно сначала создать отправителя сообщения. Стоит отметить, что при отправке транзакционного сообщения экземпляр, который мы здесь создаем, должен бытьTransactionMQProducer
.
@Component
public class TransactionProducer {
private String producerGroup = "order_trans_group";
private TransactionMQProducer producer;
//用于执行本地事务和事务状态回查的监听器
@Autowired
OrderTransactionListener orderTransactionListener;
//执行任务的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事务消息发送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.producer.sendMessageInTransaction(message, null);
}
}
В приведенном выше коде главное — создать отправителя сообщения о транзакции. Здесь мы ориентируемся наOrderTransactionListener
, который отвечает за выполнение локальных транзакций и просмотр состояния транзакций.
3. Прослушиватель транзакций ордеров
@Component
public class OrderTransactionListener implements TransactionListener {
@Autowired
OrderService orderService;
@Autowired
TransactionLogService transactionLogService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("开始执行本地事务....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
orderService.createOrder(order,message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
logger.info("本地事务已提交。{}",message.getTransactionId());
}catch (Exception e){
logger.info("执行本地事务失败。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
logger.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogService.get(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
logger.info("结束本地事务状态查询:{}",state);
return state;
}
}
проходя черезproducer.sendMessageInTransaction
После отправки сообщения транзакции, если сообщение отправлено успешно, оно будет вызвано здесьexecuteLocalTransaction
способ выполнения локальной транзакции. Здесь он завершает вставку данных заказа и журнала транзакций.
Этот метод возвращает значениеLocalTransactionState
Представляет локальное состояние транзакции, это класс перечисления.
public enum LocalTransactionState {
//提交事务消息,消费者可以看到此消息
COMMIT_MESSAGE,
//回滚事务消息,消费者不会看到此消息
ROLLBACK_MESSAGE,
//事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚
UNKNOW;
}
Так,checkLocalTransaction
Метод используется для запроса статуса транзакции. Здесь мы запрашиваем идентификатор транзакцииtransaction_log
В этой таблице, если результат может быть запрошен, будет отправлено сообщение о транзакции, если нет, будет возвращен неизвестный статус.
Обратите внимание, что здесь есть еще одна проблема. Если он возвращает неизвестное состояние,RocketMQ Broker
Сервер будет непрерывно проверять с интервалом в 1 минуту, пока не будет достигнуто максимальное количество проверок транзакции.Если статус транзакции не был проверен сверх этого числа, сообщение будет отменено.
Конечно, мы можем настроить частоту и максимальное количество охволожений транзакций. существуетBroker
стороны, вы можете настроить его следующим образом:
brokerConfig.setTransactionCheckInterval(10000); //回查频率10秒一次
brokerConfig.setTransactionCheckMax(3); //最大检测次数为3
4. Класс бизнес-реализации
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
TransactionLogMapper transactionLogMapper;
@Autowired
TransactionProducer producer;
Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());
//执行本地事务时调用,将订单数据和事务日志写入本地数据库
@Transactional
@Override
public void createOrder(OrderDTO orderDTO,String transactionId){
//1.创建订单
Order order = new Order();
BeanUtils.copyProperties(orderDTO,order);
orderMapper.createOrder(order);
//2.写入事务日志
TransactionLog log = new TransactionLog();
log.setId(transactionId);
log.setBusiness("order");
log.setForeignKey(String.valueOf(order.getId()));
transactionLogMapper.insert(log);
logger.info("订单创建完成。{}",orderDTO);
}
//前端调用,只用于向RocketMQ发送事务消息
@Override
public void createOrder(OrderDTO order) throws MQClientException {
order.setId(snowflake.nextId());
order.setOrderNo(snowflake.nextIdStr());
producer.send(JSON.toJSONString(order),"order");
}
}
В классе бизнес-обслуживания заказов у нас есть два метода. один дляRocketMQ
Отправляйте транзакционные сообщения, репозиторий реальных бизнес-данных.
Что касается того, почему это делается, то на самом деле есть некоторые причины, о которых мы поговорим позже.
5. звонок
@RestController
public class OrderController {
@Autowired
OrderService orderService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@PostMapping("/create_order")
public void createOrder(@RequestBody OrderDTO order) throws MQClientException {
logger.info("接收到订单数据:{}",order.getCommodityCode());
orderService.createOrder(order);
}
}
6. Резюме
Бизнес-логика сервиса заказов завершена. Подытожим процесс следующим образом:
Если рассматривать исключения, то основные моменты здесь следующие:
- Первый вызов createOrder отправляет сообщение о транзакции. Если отправка не удалась, что привело к ошибке, будет возвращено исключение, и в это время никакая защита данных не будет задействована.
- Если сообщение о транзакции отправлено успешно, но при выполнении локальной транзакции возникает исключение, ни данные заказа, ни журнал транзакций не будут сохранены, поскольку они находятся в локальной транзакции.
- Если локальная транзакция выполняется, но состояние локальной транзакции не возвращается вовремя или возвращается неизвестное состояние. Тогда это будет
Broker
Регулярно проверяйте статус транзакций, а затем по таблице журнала транзакций можно определить, выполнен ли заказ, и записать его в базу данных.
Основываясь на этих элементах, мы можем сказать, что согласованность обслуживания заказов и сообщений о транзакциях гарантирована. Затем следующим шагом является то, как точечный сервис может правильно использовать данные заказа и выполнять соответствующие бизнес-операции.
3. Сервис баллов
В точечном сервисе в основном используются данные заказа, а затем в соответствии с содержанием заказа добавляются баллы соответствующим пользователям.
1. Таблица рекордов очков
CREATE TABLE `t_points` (
`id` bigint(16) NOT NULL COMMENT '主键',
`user_id` bigint(16) NOT NULL COMMENT '用户id',
`order_no` bigint(16) NOT NULL COMMENT '订单编号',
`points` int(4) NOT NULL COMMENT '积分',
`remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Здесь мы ориентируемся наorder_no
поле, которое является вариантом для достижения идемпотентного потребления.
2. Потребительский старт
@Component
public class Consumer {
String consumerGroup = "consumer-group";
DefaultMQPushConsumer consumer;
@Autowired
OrderListener orderListener;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
consumer.start();
}
}
Запустить потребителя относительно просто, мы указываем, какой потреблятьtopic
и слушатель в порядке.
3. Потребительский слушатель
@Component
public class OrderListener implements MessageListenerConcurrently {
@Autowired
PointsService pointsService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
logger.info("消费者线程监听到消息。");
try{
for (MessageExt message:list) {
logger.info("开始处理订单数据,准备增加积分....");
OrderDTO order = JSONObject.parseObject(message.getBody(), OrderDTO.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
logger.error("处理消费者数据发生异常。{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
Прослушав сообщение, вызовите класс бизнес-обслуживания для его обработки. Возврат после завершения обработкиCONSUME_SUCCESS
отправить, если обработка не удалась, вернутьRECONSUME_LATER
попробовать еще раз.
4. Увеличьте точки
Здесь главное хранить интегральные данные. Но обратите внимание, что вам нужно выносить суждения перед входом в библиотеку, чтобы достичь идемпотентного потребления.
@Service
public class PointsServiceImpl implements PointsService {
@Autowired
PointsMapper pointsMapper;
Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void increasePoints(OrderDTO order) {
//入库之前先查询,实现幂等
if (pointsMapper.getByOrderNo(order.getOrderNo())>0){
logger.info("积分添加完成,订单已处理。{}",order.getOrderNo());
}else{
Points points = new Points();
points.setId(snowflake.nextId());
points.setUserId(order.getUserId());
points.setOrderNo(order.getOrderNo());
Double amount = order.getAmount();
points.setPoints(amount.intValue()*10);
points.setRemarks("商品消费共【"+order.getAmount()+"】元,获得积分"+points.getPoints());
pointsMapper.insert(points);
logger.info("已为订单号码{}增加积分。",points.getOrderNo());
}
}
}
5. Идемпотентное потребление
Есть много способов добиться идемпотентного потребления, как это сделать, зависит от вашей ситуации.
Например, в этом примере мы напрямую связываем номер заказа и запись точек в той же таблице, а перед добавлением точек мы можем проверить, был ли заказ обработан.
В качестве альтернативы мы также можем создать дополнительную таблицу для записи обработки заказа.
Кроме того, вы можете поместить эту информацию непосредственно вredis
В кэше кэш запрашивается перед хранением.
Независимо от того, как это сделать, общая идея состоит в том, чтобы проверить, было ли сообщение обработано, прежде чем выполнять бизнес. Итак, вот проблема первичного ключа данных. В этом примере мы используем номер заказа в качестве первичного ключа, и мы также можем использовать идентификатор транзакции в качестве первичного ключа. Если это обычное сообщение, мы также можем создать уникальное сообщение. ID в качестве первичного ключа.
6. Ненормальное потребление
Мы знаем, что когда потребитель не сможет обработать, он вернетRECONSUME_LATER
, пусть сообщение будет повторено, по умолчанию до 16 повторов.
Что же делать, если сообщение не было обработано должным образом по особым причинам?
Мы рассматриваем два пути решения этой проблемы.
Во-первых, установите количество повторных попыток сообщения в коде.Если указанное количество раз будет достигнуто, будет отправлено электронное письмо или SMS, чтобы уведомить бизнес-сторону о ручном вмешательстве в обработку.
@Component
public class OrderListener implements MessageListenerConcurrently {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
logger.info("消费者线程监听到消息。");
for (MessageExt message:list) {
if (!processor(message)){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 消息处理,第3次处理失败后,发送邮件通知人工介入
* @param message
* @return
*/
private boolean processor(MessageExt message){
String body = new String(message.getBody());
try {
logger.info("消息处理....{}",body);
int k = 1/0;
return true;
}catch (Exception e){
if(message.getReconsumeTimes()>=3){
logger.error("消息重试已达最大次数,将通知业务人员排查问题。{}",message.getMsgId());
sendMail(message);
return true;
}
return false;
}
}
}
Во-вторых, подождите, пока сообщение повторит максимальное количество попыток, прежде чем попасть в очередь недоставленных сообщений.
Максимальное количество повторных попыток сообщения по умолчанию составляет 16 раз, и мы также можем установить это количество раз на стороне потребителя.
consumer.setMaxReconsumeTimes(3);//设置消息重试最大次数
Название темы очереди недоставленных сообщений:%DLQ% + 消费者组名称
, например, в данных заказа мы задаем имя группы потребителей:
String consumerGroup = "order-consumer-group";
Тогда этот потребитель, соответствующее имя темы очереди недоставленных сообщений%DLQ%order-consumer-group
Как показано выше, нам также нужно нажатьTOPIC配置
, чтобы изменитьperm
свойство, измените его на 6.
Наконец, вы можете отслеживать эту тему через программный код, чтобы уведомить о ручном вмешательстве или непосредственно просматривать обработку на консоли. Благодаря идемпотентному потреблению и обработке недоставленных сообщений в основном гарантируется, что сообщение будет обработано.
В-четвертых, пример кода в RocketMQ Technology Insider.
У автора есть книга RocketMQ Technology Insider, а в главе 9.4 есть фрагмент кода распределенной транзакции.
Однако, прочитав ее, автор чувствует, что в ней есть проблема с процессом, которая вызовет нестыковки в местных делах, и разберем ее ниже.
Здесь мы в основном фокусируемся на процессе заказа бизнес-класса обслуживания и прослушивателе транзакций в книге.
В книге псевдокод для размещения заказа выглядит следующим образом:
public Map createOrder(){
Map result = new HashMap();
//执行下订单相关的业务流程,例如操作本地数据库落库相关代码
//生成事务消息唯一业务标识,将该业务标识组装到待发送的消息体中,方便消息端进行幂等消费。
//调用消息客户端API,发送事务prepare消息。
//返回结果,提交事务
return result;
}
Вышеупомянутый первый шаг, отправка сообщения о транзакции, вам нужно реализовать TransactionListener, чтобы добиться реализации местных дел и местных дел обратно, чтобы проверить.
public class OrderTransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//从消息体中获取业务唯一ID
String bizUniNo = message.getUserProperty("bizUniNo");
//将bizUniNo入库,表名:t_message_transaction,表结构 bizUniNo(主键),业务类型。
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt message) {
//从消息体中获取业务唯一ID
String bizUniNo = message.getUserProperty("bizUniNo");
//如果本地事务表(t_message_transaction)存在记录,则认为提交;如果不存在返回未知。
//如果多次回查还是未查到消息,则回滚。
if (query(bizUniNo)>0){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
//查询数据库是否存在记录
public int query(String bizUniNo){
//select count(1) from t_message_transaction where biz_uni_no = #{bizUniNo}
return 1;
}
}
Приведенный выше код скопирован автором в этой книге.Если следовать этому методу, то действительно есть проблема.Давайте проанализируем его.
1. Порядок ненормальный
Рассмотрим приведенный выше псевдокод размещения заказа, который содержит две операции: складирование заказа и отправку сообщения о транзакции.
Итак, продолжаем думать:
- Если возникнет исключение при помещении заказа на склад, это не проблема, т.к. сообщение о транзакции отправлено не будет;
- Если складирование заказа завершено, но сообщение о транзакции отправлено и сообщается об ошибке. Это не проблема, данные заказа будут откатываться;
- Если выполнение складирования заказа завершено, то ошибки в отправке сообщения о транзакции нет. но возвращается не
SEND_OK
Статус, это проблематично.
Поскольку только сообщение транзакции отправки является успешным, а статус отправкиSEND_OK
, будет выполнена локальная транзакция в слушателе, чтобыt_message_transaction
Таблицы записываются в журнал транзакций.
Дальше будет сцена: введены локальные данные заказа, а потому возврата нетSEND_OK
состояние, из-за чего журнал транзакций в локальной транзакции не выполняется. Потом это сообщение о транзакции будет рано или поздно откатываться.Последняя проблема заключается в том, что пользователь успешно разместил заказ, но баллы не начислились.
2. Исключение выполнения локальной транзакции
На самом деле, первую проблему тоже можно обойти. То есть после отправки сообщения о транзакции оценивается, является ли состояние отправкиSEND_OK
, если нет, откатить данные заказа, выдав исключение.
Однако есть вторая проблема:
Что делать, если данные заказа и сообщения о транзакциях отправляются, но что делать при выполнении локальных транзакций?
Если это так, это также приведет к хранению данных локального заказа, но журнал транзакций не был записан. Эта запись не может быть запрошена, когда статус транзакции проверяется, и, наконец, сообщение транзакции может быть только откатывается только Отказ Конечное явление также является то, что пользователь успешно разместил заказ, но не увеличил точки.
Но в книге у автора есть такой отрывок:
executeLocalTransaction, этот метод в основном устанавливает состояние локальной транзакции в транзакции с бизнес-кодом. Например, в
OrderService#createOrder
, если локальная транзакция успешно зафиксирована, этот метод также будет успешно зафиксирован. Поэтому здесь в основном кt_message_transaction
Добавьте запись, и когда транзакция проверяется, если есть запись, считается, что сообщение нужно отправить.
Я понимаю, что автор имеет в виду в этом отрывке то, что все они касаются местного дела. еслиcreateOrder
Способ успешно выполняется, тоexecuteLocalTransaction
Способ также успешно выполняется; если какая-либо сторона не удается, транзакция возвращается назад.
Однако, если мы анализируем из исходного кода, если локальное выполнение транзакции сообщает об ошибке, данные заказа не будут откатываться.
3. Анализ исходного кода
Во-первых, нам нужно знать,executeLocalTransaction
Методы иcreateOrder
Метод действительно находится в транзакции.
Это потому чтоexecuteLocalTransaction
Методы вызываются синхронно после отправки сообщения транзакции, поэтому они находятся в транзакции.
Давайте посмотрим на процесс отправки сообщений о транзакциях в исходном коде:
public TransactionSendResult sendMessageInTransaction(Message msg,
LocalTransactionExecuter localTransactionExecuter,
Object arg)throws MQClientException {
//发送事务消息返回结果
SendResult sendResult = null;
//如果发送消息失败,抛出异常
try {
sendResult = this.send(msg);
} catch (Exception var11) {
throw new MQClientException("send message Exception", var11);
}
//初始化本地事务状态:未知状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch(sendResult.getSendStatus()) {
//如果发送事务消息状态为send_ok
case SEND_OK:
try {
//执行本地事务方法
if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
}
break;
//如果发送事务状态不是send_ok,该事务消息会被回滚
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
}
//结束事务,就是根据本地事务状态,执行提交、回滚或暂不处理事务
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception var9) {
this.log.warn("", var9);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
Приведенный выше код представляет собой процесс отправки сообщений о транзакциях. Давайте сосредоточимся на этом, если сообщение о транзакции отправлено успешно и статус возвратаSEND_OK
, затем запустите прослушиватель вexecuteLocalTransaction
методы, что означает, что они находятся в транзакции.
Однако во время выполнения он вручную ловитThrowable
аномальный. Это означает, что даже если выполнение локальной транзакции завершится ошибкой, откат не будет запущен.
До сих пор мы очень ясно давали понять, что если мы напишем код в соответствии с процессом, описанным в книге, эта часть станет скрытой опасностью.
Если мы хотим избежать этой проблемы, мы можем только изменитьrocket-client
код, например:
try {
//执行本地事务方法
if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
throw new MQClientException(e.getMessage(),e);
}
Автор модифицировал исходный код и протестировал его, так же возможно и генерирование исключений вручную. Таким образом, если при выполнении локальной транзакции произойдет ошибка, данные заказа также будут отброшены.
На данный момент вы можете ответить на вопрос в главе 2.4 этой статьи:
Почему в заказе бизнес-услуги должен иметь два метода. Один используется для отправки сообщений транзакций в RocketMQ, а другой используется для хранения реальных бизнес-данных.
Суммировать
Эта статья посвященаRocketMQ
Случай распределенной транзакции для достижения окончательной согласованности.
Кроме того, я также рассказал об образце кода распределенной транзакции в книге «RocketMQ Technology Insider» и возможных нештатных проблемах. По этому пункту я также надеюсь, что друзья с разными взглядами будут активно оставлять сообщения и общаться друг с другом.