RocketMQ предоставляет функцию сообщений о транзакциях, используя2PCФункция распределенной транзакции (двухэтапное соглашение) + механизм компенсации (обратная связь транзакции), таким образом может быть достигнута окончательная согласованность распределенных транзакций.
Обзор
- Полутранзакционное сообщение:
Для сообщения, которое не может быть временно доставлено, отправитель успешно отправил сообщение на сервер версии RocketMQ очереди сообщений, но сервер не получил вторичное подтверждение сообщения от производителя, и сообщение помечено как «временно недоставлено». Сообщение в этом состоянии является полутранзакционным сообщением. - Проверьте сообщение:
Вторичное подтверждение сообщения транзакции теряется из-за сбоя в сети, перезапуска приложения-производителя и т. д. Когда сервер очереди сообщений версии RocketMQ сканирует и обнаруживает, что сообщение долгое время находилось в «полутранзакционном сообщении» время, он должен взять на себя инициативу, чтобы уведомить производителя сообщения.Запросить окончательный статус сообщения (фиксация или откат), процесс запроса является проверкой сообщения.
Интерактивный процесс
Шаги для отправки сообщений о транзакциях следующие:
- Отправитель отправляет полутранзакционное сообщение на сервер очереди сообщений RocketMQ.
- После того, как сервер очереди сообщений для RocketMQ успешно сохраняет сообщение, он возвращает подтверждение отправителю.
Подтвердите, что сообщение было успешно отправлено и в настоящее время является полутранзакционным сообщением.
- Отправитель начинает выполнять локальную логику транзакции.
- Отправитель отправляет вторичное подтверждение (фиксация или откат) на сервер в соответствии с результатом выполнения локальной транзакции, и сервер пометит полутранзакционное сообщение как доставляемое после получения статуса фиксации, и подписчик в конечном итоге получит сообщение. ; сервер получает Состояние отката удаляет полутранзакционное сообщение, и подписчик не примет это сообщение.
Этапы просмотра сообщений о транзакциях следующие:
- В особом случае отключения сети или перезапуска приложения вторичное подтверждение, отправленное на шаге 4 выше, в конце концов не достигает сервера, и сервер инициирует проверку сообщения для сообщения через фиксированное время.
- После того, как отправитель получит ответное сообщение, ему необходимо проверить окончательный результат выполнения локальной транзакции соответствующего сообщения.
- Отправитель снова отправляет второе подтверждение в соответствии с окончательным состоянием локальной транзакции, полученным в результате проверки, и сервер продолжает работать с полутранзакционным сообщением в соответствии с шагом 4.
В общем, сообщения транзакций RocketMQ делятся на две основные строки
- Процесс отправки: отправить полусообщение (половина сообщения), выполнить локальную транзакцию, отправить результат выполнения транзакции
- Процесс проверки запланированных задач: запланированные задачи MQ сканируют половину сообщений, просматривают локальные транзакции и отправляют результаты выполнения транзакций.
Анализ исходного кода
Как производитель отправляет транзакционные полусообщения (подготовить)
Базовым классом для отправки транзакционных сообщений в локальных приложениях являетсяTransactionMQProducer
, этот класс повторно использует большую часть логики, связанной с отправкой сообщений, наследуя DefaultMQProducer.Объем кода этого класса очень мал и всего 100 строк.Ниже приведен код этого классаsendMessageTransaction
метод
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
//判断transactionListener是否存在
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
//发送事务消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
здесьtransactionListener
Это класс вышеупомянутого обратного сообщения, который предоставляет 2 метода:
- executeLocalTransaction
выполнять локальные транзакции - checkLocalTransaction
Проверить локальные транзакции
см. далееDefaultMQProducer.sendMessageInTransaction()
метод:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//判断检查本地事务的listenner是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//。。。省略
SendResult sendResult = null;
//msg设置参数TRAN_MSG,表示为事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//发送消息成功,执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//...省略
return transactionSendResult;
}
Этот метод в основном делает следующие вещи
- Пометьте сообщение тегом, связанным с сообщением о транзакции, который используется брокером для различения обычных сообщений и сообщений о транзакциях.
- Отправить половинное сообщение
- Если передача прошла успешно, transactionListener выполнит локальную транзакцию.
- Выполните метод endTransaction, чтобы указать брокеру выполнить фиксацию/откат.
выполнять локальные транзакции
Затем возвращаемся к тому месту, где Продюсер отправляет половинчатое сообщение выше, и продолжаем смотреть вниз.
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//发送消息成功,执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
//半消息发送失败,回滚
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
После того, как полусообщение транзакции будет успешно отправлено, оно вызоветtransactionListener.executeLocalTransaction
способ выполнения локальной транзакции. Только после успешной отправки полусообщения будет выполнена локальная транзакция, и если полусообщение не будет отправлено, будет установлен откат.
Завершение транзакции (фиксация/откат)
После выполнения локальной транзакции вызовитеthis.endTransaction()
метод, чтобы зафиксировать транзакцию или откатить транзакцию в соответствии с состоянием выполнения локальной транзакции.
Если полусообщение не может быть отправлено или локальная транзакция не может быть выполнена, он сообщает серверу удалить полусообщение, а если полусообщение успешно отправлено и локальная транзакция выполнена успешно, он сообщает серверу вступить в силу полусообщения
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
//提交事务
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
//回滚
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
Как сторона брокера обрабатывает сообщения о транзакциях
Брокерская сторона черезSendMessageProcessor.processRequest()
Метод получает и обрабатывает полусообщение, отправленное Продюсером.
в конце концов позвонитSendMessageProcessor.sendMessage()
, определите тип сообщения и сохраните сообщение.
//SendMessageProcessor.java
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//...省略
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//存储事务消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//存储普通消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
Затем посмотрите на код, в котором хранится половина сообщения prepareMessage(msgInner) :
//TransactionalMessageBridge.java
//存储事务半消息
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//备份消息的原主题名称与原队列ID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//事务消息的topic和queueID是写死固定的
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
На этом шаге создайте резервную копию исходного имени темы и исходного идентификатора очереди сообщения, затем отмените метку сообщения транзакции, сбросьте тему сообщения на: RMQ_SYS_TRANS_HALF_TOPIC, а идентификатор очереди будет зафиксирован на 0. Отличие от других обычных сообщений, а затем полное сохранение сообщения.
В этот момент Брокер первоначально обработал полусообщение о транзакции, отправленное Продюсером.
Брокер обрабатывает вторичную отправку сообщений о транзакциях
Далее давайте посмотрим, как брокер обрабатывает отправку сообщения о транзакции и команды отката, когда производитель или запланированная задача отправляет/откатывает транзакцию.
//EndTransactionProcessor.java
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
//从节点不处理
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
//省略代码,打印日志
OperationResult result = new OperationResult();
//如果请求为提交事务,进入事务消息提交处理流程
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//根据commitLogOffset从commitlog文件中查找消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//字段检查
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//恢复事务消息的真实的主题、队列,并设置事务ID
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
//设置消息的相关属性,取消事务相关的系统标记
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//发送最终消息,存储,被consumer消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} //回滚处理
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//根据commitlogOffset查找消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//字段检查
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
Логика здесь очень ясна, и ее основная реализация выглядит следующим образом:
- Поиск сообщений на основе commitlogOffset
- Если это действие фиксации, восстановите тему и очередь исходного сообщения, снова сохраните их в файле журнала фиксации, а затем передайте их в очередь потребления сообщений для потребления потребителем, а затем сохраните исходное предварительно обработанное сообщение в новой теме RMQ_SYS_TRANS_OP_HALF_TOPIC. , что означает, что сообщение было обработано
- Если сообщение откатывается, исходное предварительно обработанное сообщение сохраняется непосредственно в новой теме RMQ_SYS_TRANS_OP_HALF_TOPIC, что указывает на то, что сообщение было обработано.
Ретроспективный просмотр транзакций полусообщений
Двухэтапный протокол отправляет и фиксирует сообщение отката, а состояние сообщения локальной транзакции после выполнения:UNKNOW
, конечная транзакция ничего не делает. Статус транзакции отправителя — откат или фиксация посредством регулярной проверки статуса транзакции.
пройти черезTransactionalMessageCheckService
Время потока для обнаруженияRMQ_SYS_TRANS_HALF_TOPIC
Сообщение в тему, вернуться, чтобы проверить статус транзакции сообщения.
- RMQ_SYS_TRANS_HALF_TOPIC
Тема сообщения о подготовке и сообщение о транзакции входят в эту тему первыми. - RMQ_SYS_TRANS_OP_HALF_TOPIC
Когда сервер сообщений получает запрос на фиксацию или откат для транзакционного сообщения, он сохраняет сообщение в этом разделе.
Ввод кода:
public void run() {
log.info("Start transaction check service thread!");
//执行间隔
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
//事务过期时间
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//检查本地事务
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
Общий процесс выглядит следующим образом:
Общий процесс реализации
ненормальная ситуация
Если у нас есть система заказов, нам нужно вызвать систему купонов после размещения заказа. Мы используем метод RocketMq, чтобы отправить сообщение в систему купонов для распространения купонов после успешной оплаты заказа. Здесь через сообщения о транзакциях это гарантировано, что купоны могут быть распространены успех.
Давайте подумаем о нескольких нестандартных сценариях и посмотрим, сможет ли RocketMq их решить.
Продюсер не смог отправить половину сообщения
Система заказов производителя может не отправить половинные сообщения (подготовить) из-за сбоев в сети или mq.
В это время система заказов может выполнить операцию отката, такую как «закрытие заказа» и т. д., и вернуть деньги пользователю посредством обратного процесса.
Половина сообщения успешно отправлена, локальная транзакция не выполнена
Если полусообщение, отправленное системой заказов, выполнено успешно, но выполнение локальной транзакции не выполнено, например, обновление статуса заказа на «завершено».
В этом случае после сбоя выполнения локальной транзакции откат будет возвращен в MQ, и MQ удалит ранее отправленное полусообщение. Система купонов не будет активирована.
Половинное сообщение было успешно отправлено, но ответ от MQ не получен.
Если система заказов успешно отправляет половину сообщения, она не получает ответ, возвращаемый MQ.
В настоящее время это может быть связано с проблемами сети или другими ненормальными ошибками.Система заказов ошибочно полагала, что отправка половинных сообщений MQ не удалась, и выполнила обратный процесс отката.
Но в это время mq успешно сохранил половину сообщения, так что же делать с этим сообщением?
В это время фоновое сообщение MQ проверяет запланированную задачу.TransactionalMessageCheckService
Он будет сканировать очередь полусообщений каждую минуту, чтобы определить, нужно ли ему проверять сообщение, а затем проверять локальную транзакцию системы заказов.В это время MQ обнаружит, что заказ стал «закрытым», а затем он отправит запрос на откат в mq, удалит предыдущее половинное сообщение.
Что делать, если фиксация/откат не удалась
Это на самом деле через задание на времяTransactionalMessageCheckService
, он обнаружит, что сообщение не обрабатывалось в два этапа в течение определенного периода времени, и проверит локальную транзакцию.
思考
- Как RocketMQ гарантирует, что полу-сообщения (подготовка) не будут потребляться потребителями?
- Как реализована операция отката сообщений о транзакциях?
- Как сделать сообщение видимым для потребителей после фиксации транзакции?
- Поняв принцип сообщений о транзакциях, можете ли вы вывести принцип задержанных сообщений?
цитирование данных
woo woo Краткое описание.com/afraid/4 Ah 9 методов измерения волн 69…