Анализ исходного кода сообщений транзакций RocketMQ, которые вы должны понимать (галантерея)

Java

предисловие

Используя преимущества функциональных характеристик MQ, таких как сглаживание пиков и заполнение впадин, разделение системы и асинхронная работа в интернет-индустрии, можно сказать, что там, где есть распределенные услуги, MQ часто не отсутствует. RocketMQ, разработанный Али, много лет сталкивался с серьезными проблемами параллелизма в Double Eleven. Среди них версия 4.3.0 представила новые функции сообщений о транзакциях. В этой статье представлено отслеживание исходного кода, связанное с сообщениями о транзакциях в RocketMQ. версия 4.5.0 Знать:

  • Какие проблемы решают сообщения о транзакциях
  • Принцип реализации сообщения о транзакции и особенности его оформления

какая проблема

Предположим, что в моей системе сейчас такой сценарий:

Локально откройте транзакцию базы данных для операции вычета и после успеха отправьте сообщение MQ в центр инвентаризации для доставки.

Кто-то может подумать о том, чтобы открыть реализацию транзакций mybatis, разве этого недостаточно, чтобы объединить локальные транзакции и MQ-сообщения? Если передача MQ прошла успешно, транзакция фиксируется, а если передача не удалась, транзакция откатывается, и весь набор операций выполняется за один раз.

transaction{
  扣款();
  boolean success = 发送MQ();
	if(success){
    commit();
  }else{
    rollBack();
  }
}

Вроде нормально, носеть ненадежнаиз.

Предположим, что ответ, возвращенный MQ, не был получен по сетевым причинам, поэтому перед лицом неопределенных результатов возврата MQ его необходимо откатить. Тем не менее сервер MQ получил это сообщение, но ответ клиенту был потерян, поэтому вывод был неудачным, а доставка прошла успешно.

scene

Поскольку отправку MQ-сообщений нельзя записать вместе с локальными транзакциями, как обеспечить требуемую общую атомарность? Ответ - главный герой, которого мы представляем сегодня:сообщение о транзакции.

Обзор

概览图)

В общем, сообщения транзакций RocketMQ делятся на две основные строки

  1. Процесс отправки запланированного задания: Отправить полусообщение (половина сообщения), выполнить локальную транзакцию, отправить результат выполнения транзакции
  2. Процесс проверки запланированных задач: Сервер MQ проверяет локальную транзакцию и отправляет результат выполнения транзакции.

Поэтому в этой статье также анализируется исходный код через эти две основные линии.

Анализ исходного кода

Процесс отправки половины сообщения

Локальное приложение (клиент)

Базовым классом для отправки сообщений о транзакциях в локальных приложениях является TransactionMQProducer. Этот класс повторно использует большую часть логики, связанной с отправкой сообщений, наследуя DefaultMQProducer. Объем кода этого класса очень мал и составляет всего 100 строк. Ниже приведен метод sendMessageTransaction этого сорт

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }

    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

Этот метод делает две вещи,

  1. Проверьте, существует ли transactionListener
  2. Вызов родительского класса для отправки сообщения о транзакции

TransactionListener играет решающую роль в процессе передачи сообщений о транзакциях, давайте взглянем на этот интерфейс.

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

Комментарии к интерфейсу очень ясны.Согласно приведенной выше обзорной диаграмме, метод executeLocalTransaction соответствуетвыполнять локальные транзакцииОперация checkLocalTransaction соответствуетПроверить локальные транзакцииработать.

Ниже приведен исходный код метода sendMessageInTransaction класса DefaultMQProducer.

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    ...
    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    						...
        sendResult = this.send(msg);
    						...
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            		...
        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                ...
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    						...
        this.endTransaction(sendResult, localTransactionState, localException);
								...
}

Чтобы сделать логику исходного кода более интуитивно понятной, автор упростил основной код. Метод sendMessageInTransaction в основном выполняет следующие действия.

  1. Отметьте сообщение меткой, относящейся к транзакционному сообщению, которая используется сервером MQ для различения обычных сообщений и транзакционных сообщений.
  2. Отправить половинное сообщение
  3. Если передача прошла успешно, transactionListener выполнит локальную транзакцию.
  4. Выполните метод endTransaction, еслиНе удалось отправить половину сообщенияилиНе удалось выполнить локальную транзакциюСкажите серверу удалить половину сообщений,Половина сообщения успешно отправленаа такжеЛокальная транзакция выполнена успешноЗатем скажите серверу, чтобы вступило в силу половинное сообщение.

Процесс отправки полусообщений, клиентский код почти закончен, посмотрим, как с этим справится RocketMQ Server

RocketMQ Server

После того, как сервер получит сообщение, он преобразует некоторые объекты домена и проверит, поддерживает ли он проверку разрешений сообщения транзакции, что не очень полезно для понимания сообщения транзакции. Ниже приведен исходный код класса TransactionalMessageBridge для обработки половинного сообщения.

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

Эти два метода в основном делают следующее:

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;
}
  1. Поместите тему и QueueID сообщения в карту тела сообщения для кэширования
  2. Установите тему сообщения на "RMQ_SYS_TRANS_OP_HALF_TOPIC" и queueId на 0
  3. Запись сообщений на диск для сохранения

Можно видеть, что все полусообщения транзакций будут помещены в одну и ту же очередь одной и той же темы.Благодаря различению тем полусообщения предотвращаются от потребления потребителем.

Сервер сохраняет полусообщение, а затем отправляет результат нашему локальному приложению. На этом завершается обработка половины сообщений на стороне сервера с последующим появлением задач на время.

Процесс проверки запланированных задач

RocketMQ Server

Временная задача представляет собой поток с именем TransactionalMessageService, а ниже приведен метод проверки этого класса.

@Override
public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
                  ...
     if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
     }
       listener.resolveHalfMsg(msgExt);
   } 
									...
}

Метод проверки очень длинный, и опущенный код обычно используется для фильтрации половины сообщений (например, сообщения о транзакциях, срок действия которых превышает 72 часа, считаются просроченными), и только половина сообщений, отвечающих условиям, остается для проверки.

Одним из наиболее интересных является метод putBackHalfMsgQueue, потому что каждый раз, когда половина сообщения извлекается с диска в память для обработки, его атрибуты будут изменяться (например, TRANSACTION_CHECK_TIMES, который является ключевой информацией о том, следует ли отбрасывать сообщения о транзакциях), поэтому перед отправка контрольного сообщения Половинное сообщение необходимо снова записать на диск. Подход, используемый RocketMQ, основан на последнем физическом смещении.переписать, вместо исходного полусообщенияИсправлять, цель которого заключается в том, что дизайн хранилища RocketMQ принимает последовательную запись.Если вы изменяете сообщение, оно не может обеспечить высокую производительность.

Ниже приведен метод resolveHalfMsg, который в основном запускает поток и отправляет контрольное сообщение.

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

Локальное приложение (клиент)

Ниже приведен метод checkTransactionState класса DefaultMQProducerImpl, который представляет собой логику обработки локального приложения для сообщения проверки.

@Override
public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        ...
        @Override
        public void run() {
            ...
     TransactionListener transactionListener = getCheckListener();
            ...
     localTransactionState = transactionListener.checkLocalTransaction(message);
               ...
                 
      this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);        
        }
      
        private void processTransactionState(
           ...
 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
           ...
        }
    };
    this.checkExecutor.submit(request);
}

После упрощения логики кода хорошо видно

  • Запустите поток для выполнения логики просмотра
  • Выполните метод checkLocalTransaction для transactionListener, чтобы получить результат выполнения локальной транзакции.

RocketMQ Server

После того, как сервер RocketMQ получит сообщение Commit, отправленное клиентом, он

Прочитайте половину сообщения --> восстановите информацию исходного тела сообщения, такую ​​как тема --> снова запишите его на диск, как обычное сообщение --> удалите предыдущую половину сообщения.

Если это сообщение отката, удалите предыдущее половинное сообщение напрямую.

На этом цепочка вызовов всего сообщения транзакции RocketMQ завершена.

думать

1. Равны ли распределенные транзакции транзакционным сообщениям?

Между ними нет никакой связи. Сообщение о транзакции гарантирует только общую атомарность локальной транзакции и отправки сообщения MQ. После доставки на сервер MQ не гарантируется, сможет ли потребитель успешно использовать.

2. Есть ли какие-то особенности в дизайне исходного кода?

Благодаря изучению и пониманию исходного кода всей ссылки обнаруживается, что есть еще много ярких пятен.

  • Сторона сервера проверяет отправку сообщения, сторона клиента проверяет логику обработки сообщения, а сообщение фиксации/отката на стороне клиента отправляется асинхронно.Гарантируется, что даже если состояние сети не очень хорошее в течение короткого времени в распределенной среде, это не повлияет на общую логику.
  • Введение TransactionListener действительно реализует принцип открытия-закрытия и принцип инверсии зависимостей, а также программирование, ориентированное на интерфейс. В целом масштабируемость очень хорошая, пользователям нужно только написать свой Listener для отправки сообщений о транзакциях, что очень удобно.
  • TransactionMQProducer повторно использует логику, связанную с отправкой сообщений, наследуя DefaultMQProducer.

3. Есть ли недостатки в дизайне исходного кода?

Как чрезвычайно успешное связующее ПО для сообщений, RocketMQ не так просто найти недостатки.Автор расскажет о нескольких моментах.

  • Методы, связанные с транзакциями, такие как sendMessageIntransaction, делятся на DefaultMQProducer.С целостной точки зрения, это метод отправки сообщений, связанных с транзакциями, который следует разделить на TransactionMQProducer.

  • Все сообщения половины темы будут записаны в очередь сообщений половины с темой RMQ_SYS_TRANS_OP_HALF_TOPIC, и каждое сообщение половины будет записано несколько раз во всей ссылке.Если параллелизм велик и большинство сообщений являются сообщениями о транзакциях, надежность. Будут проблемы.