Транзакционные сообщения очередей сообщений, как это делают RocketMQ и Kafka?

очередь сообщений

Во все времена к людям, способным учиться, не будут относиться плохо.

Всем привет, меня зовут да.

Сегодня поговорим о транзакционных сообщениях очередей сообщений. Когда мы говорим о транзакциях, я думаю, что все знакомы с ними. ACID — это то, что сразу приходит на ум.

Обычно транзакция, которую мы понимаем, состоит в том, что некоторые операции обновлений добиваются успеха или не удаются, и не будет промежуточного состояния, а кислота представляет собой строгое определение реализации транзакции, но, как правило, не строго выполняется в одной системе. Кислотные ограничения для реализации транзакции, не говоря уже о распределенных системах.

Распределенные системы, как правило, идут на компромисс только в отношении возможной согласованности., чтобы обеспечить окончательную целостность и непротиворечивость данных, основная причина заключается в том, что сила не допускается ... потому что доступность имеет решающее значение.

И очень дорого обеспечить реализацию полной версии транзакции.Вы хотите поддерживать так много системных данных, никакие данные промежуточного состояния не могут быть прочитаны, и все операции должны быть неделимы, а это значит, что выполнение транзакции блокировка Да, ресурс давно заблокирован.

В случае высокого параллелизма ресурс занят в течение длительного времени, что является фатальной травмой.Приведу пример со вкусом, в пиковый период использования туалета, вы можете понять это, если вы это знаете.

Кстати, студенты, которые не знают, что такое ACID, посмотрите, я не буду вдаваться в подробности.

Распределенная транзакция

Когда дело доходит до распределенных транзакций, распространенными являются 2PC, TCC и сообщения о транзакциях.Эта статья посвящена сообщениям о транзакциях, но я немного упомяну 2PC и TCC.

2PC

2PC — это двухэтапная подача, в которой есть две роли координатора и участника, второй этап — этап подготовки и этап подачи.

На этапе подготовки координатор отправляет команду подготовки каждому участнику.На этом этапе участники сделали все, кроме отправки транзакции.На этапе отправки координатор проверяет, в порядке ли этап подготовки каждого участника.Каждый участник отправляет команду фиксации, а если она не подходит, отправляется команда отката.

Дело в том, что2PC применяется только к транзакциям на уровне базы данных,Что это обозначает? То есть вы хотите записать кусок данных в базу данных и загрузить картинку одновременно.Эти две операции 2PC не может гарантировать, что две операции удовлетворят ограничениям транзакции.

А 2PC этосильная консистенцияраспределенная транзакция, этосинхронная блокировкаТо есть перед получением команды фиксации или отката все участники ждут друг друга, особенно когда фаза подготовки завершена, ресурсы в это время все заблокированы.Если один участник застрял надолго, у других участников есть ждать его,Блокировка в состоянии долгосрочной блокировки ресурсов.

Общая неэффективностьИ существуюЕдиный момент провалаПроблема в том, что координатор является той единственной точкой и существует в экстремальных условиях.Несоответствия данныхНапример, участник не получил команду на фиксацию, а машина в это время не работала, после восстановления произошел откат данных, а другие участники фактически выполнили команду на фиксацию транзакции.

TCC

TCC гарантирует транзакции бизнес-уровня, то есть он не только на уровне базы данных, он также может выполнять вышеуказанные операции по загрузке изображений.

TCC делится на три этапа: попробуйте - подтвердите - отмените. Проще говоря, каждый бизнес должен иметь эти три метода. Во-первых, выполнить метод try. Этот этап не будет выполнять реальные бизнес-операции, просто сначала займите яму, что? ты имеешь ввиду? Например, если вы планируете добавить 10 баллов, сначала добавьте 10 баллов в предварительно добавленное поле, в это время баллы на счете пользователя фактически не увеличиваются.

Затем, если все попытки будут успешными, выполните метод подтверждения, и все будут выполнять настоящие бизнес-операции.Если одна попытка не удалась, то все сразу же выполнят операцию отмены, чтобы отменить изменения.

можно увидетьTCC на самом деле очень привязан к бизнесу, потому что бизнес должен выполнить определенную трансформацию, чтобы завершить эти три метода, что на самом деле является недостатком TCC,И операции подтверждения и отмены должны обращать внимание на идемпотентностьПоскольку выполняется отсутствие отступления, когда эти два этапа выполняются, он должен быть завершен, поэтому требуется механизм повтора, поэтому необходимо обеспечить, чтобы метод IDEMPotent.

сообщение о транзакции

Деловые новости — главный герой сегодняшней статьи, этоОн в основном подходит для сценариев асинхронного обновления и мест, где требования к данным в реальном времени не высоки..

его цель состоит в том, чтобыРешите проблему согласованности данных между производителями сообщений и потребителями сообщений.

Например, если вы заказываете еду на вынос, мы сначала выбираем жареного цыпленка, чтобы добавить его в корзину, затем выбираем бутылку колы, затем размещаем заказ, и процесс оплаты заканчивается.

Данные внутри корзины очень подходят для асинхронного удаления с уведомлением о сообщении, малоэффективны.

Мы надеемся, что товары в корзине в конечном итоге будут удалены после успешного размещения заказа, поэтому суть в следующем.Два шага размещения заказа и отправки сообщения либо завершаются успешно, либо нет.

Сообщение о транзакции RocketMQ

Давайте сначала посмотрим, как RocketMQ реализует транзакционные сообщения.

Сообщение о транзакции RocketMQ также можно рассматривать как двухэтапную фиксацию, Проще говоря, в начале транзакции Брокеру будет отправлено полусообщение.

Значение половины сообщения состоит в том, что это сообщение невидимо для потребителя, и это не очередь, которая будет отправлена, но специальная очередь.

После отправки половинного сообщения выполните локальную транзакцию, а затем решите, следует ли отправить сообщение фиксации брокеру или отправить сообщение отката в соответствии с результатом выполнения локальной транзакции.

В это время кто-то сказал, что на этом шаге не удалось отправить сообщение фиксации или отката, что мне делать?

Это не имеет большого значения. Брокер будет периодически проверять у производителя, успешно ли прошла транзакция. В частности, производитель должен предоставить интерфейс. Через этот интерфейс брокер может узнать, была ли транзакция успешно выполнена или нет. не удалось, будет возвращено неизвестное значение, поскольку возможно. Транзакция все еще выполняется, и выполняется несколько запросов.

В случае успеха восстановите половину сообщения в очереди для обычной отправки, чтобы потребитель мог использовать сообщение.

Давайте вкратце рассмотрим, как им пользоваться, я упростил его по образцу кода на официальном сайте.

Видно, что он очень прост и интуитивно понятен в использовании, это не что иное, как добавление метода для реверсирования результата транзакции, а затем запись процесса локального выполнения транзакции в TransationListener.

До сих пор общий поток сообщений о транзакциях RocketMQ был ясен. Давайте нарисуем общую блок-схему, чтобы пройти его. Фактически, на четвертом этапе сообщение либо является обычным сообщением, либо отбрасывается, и ничего не существует. В настоящее время сообщение о транзакции было завершено жизненным циклом.

Анализ исходного кода сообщения транзакции RocketMQ

Затем давайте посмотрим, как это делается с точки зрения исходного кода.sendMessageInTransactionМетод, метод немного длинный, но структура все еще очень ясна без отношений.

Процесс — это то, что мы проанализировали выше, вставляя сообщение в некоторые атрибуты, указывая, что сообщение все еще является полусообщением в это время, а затем отправляя его брокеру, затем выполняя локальную транзакцию, а затем отправляя статус выполнения локальная транзакция Брокеру, теперь мыДавайте посмотрим, как брокер обрабатывает это сообщение.

Этот запрос полусообщения будет обрабатываться в Брокерском SendMessageProcessor#sendMessage, т.к. сегодня основной анализ - это сообщения о транзакциях, поэтому другие процессы анализироваться не будут, расскажу вкратце о принципе.

Проще говоря, находится в свойствах полученного сообщения в sendMessageMessageConst.PROPERTY_TRANSACTION_PREPAREDЕсли это правда, то вы можете узнать, что сообщение является сообщением о транзакции, а затем решить, превышает ли сообщение максимальное время потребления, нужно ли его откладывать, принимает ли брокер сообщения о транзакции и т. д., а затем сохраняет реальная тема и очередь сообщения в свойствах, а затем сбросить тему сообщения наRMQ_SYS_TRANS_HALF_TOPIC , а очередь равна 0 в очереди, поэтому потребитель не может прочитать это сообщение.

Выше приведен общий процесс обработки полусообщений Давайте посмотрим на исходный код.

То есть кошка енота пришла изменить принц. На самом деле, задержанные новости также были реализованы таким образом, а новости об изменении кожи была окончательно положена на тарелку.

Брокерский метод обработки фиксации или отката сообщенияEndTransactionProcessor#processRequestДавайте посмотрим, что он сделал.

Видно, что если транзакция отправлена, скин обменивается и записывается в очередь, к которой принадлежит реальная тема для потребления консументами. фоновая служба будет сканировать в это время.Когда это половина сообщения, считается, что сообщение было обработано в соответствии с ним.

Эта фоновая службаTransactionalMessageCheckServiceслужбы, он будет периодически сканировать очередь полусообщений и запрашивать интерфейс обратной проверки, чтобы убедиться, что транзакция прошла успешно.TransactionalMessageServiceImpl#checkметод.

Кратко расскажу о процессе.На самом деле этот шаг включает в себя много кода.Я не буду выкладывать код.Заинтересованные студенты могут разобраться в нем сами. Но я верю, что это можно ясно объяснить языком.

Во-первых, возьмите половину темы сообщения, т.е.RMQ_SYS_TRANS_HALF_TOPICВсе очереди ниже, если вы помните вышеуказанный контент, то знаете, что очередь, в которую пишется половинное сообщение, это очередь, id которой равен 0, а затем выносите очередь под соответствующим этой очереди топиком half_op, то естьRMQ_SYS_TRANS_OP_HALF_TOPICОчередь по теме.

Эта половина_операция в основном предназначена для записи того, что сообщение транзакции было обработано, то есть сообщение, в котором известно, было ли сообщение транзакции зафиксировано или отменено, будет записано в половине_операции.

тогда позвониfillOpRemoveMapвзять пакет обработанных сообщений из half_op для дедупликации и вызвать те половинки сообщений, которые не записаны в half_opputBackHalfMsgQueueОн снова записан в ContimeLog, и затем отправляется запрос обратной проверки транзакций. Этот запрос обратной проверки также является в навсениею, то есть он не будет ждать ответа. Конечно, компенсирование потребления полувидного очереди в это время также будет продвигаться.

потомproducerClientRemotingProcessor#processRequest in обработает этот запрос, бросит задачу в пул потоков TransactionMQProducer и, наконец, вызовет определенный выше, когда мы отправим сообщениеcheckLocalTransactionStateметод, а затем отправить статус транзакции Брокеру, также используя oneWay.

Когда вы это увидите, я думаю, у вас возникнут некоторые вопросы, например, почему существует half_op, почему вы должны записывать его в коммитлог после обработки половинного сообщения, не спешите слушать меня по одному.

во-первыхДизайн RocketMQ заключается в последовательном добавлении и записи, поэтому он не изменит сообщения, которые уже поступили на диск., то сообщение о транзакции должно обновить количество раз обратной проверки, и если обратная проверка не пройдена, будет определено, что транзакция откатывается.

Поэтому каждый раз, когда вы хотите проверить, предыдущее половинное сообщение снова будет введено в диск, а прогресс расхода будет продвигаться вперед. И half_op будет записывать результат каждого обратной проверки, будь то предан или откатывается назад, поэтому в следующий раз он зацикливается для обработки этого половинного сообщения, вы можете знать от Half_op, что эта транзакция закончилась, поэтому оно отфильтровано не нужно иметь дело с этим.

Если полученный результат обратной проверки НЕИЗВЕСТЕН, результат не будет записан в half_op, поэтому его можно будет снова проверить в обратном направлении и обновить количество обратных проверок.

Теперь, когда весь процесс ясен, я нарисую диаграмму, чтобы обобщить процесс обработки транзакций брокером.

Сообщения о транзакциях Kafka

Сообщения транзакций Kafka отличаются от сообщений транзакций RocketMQ. RocketMQ решает два действия: выполнение локальной транзакции и отправку сообщения, чтобы удовлетворить ограничения транзакции.

Сообщение транзакции Kafka используется в том случае, когда необходимо отправить несколько сообщений в одной транзакции, чтобы обеспечить ограничения транзакций между несколькими сообщениями, то есть несколько сообщений либо отправляются успешно, либо все не удалось, как показано в следующем коде.

Кафка существенно с его механизмом транзакции для достижения идемпотентной семантики ровноТак что сообщение Кафка-транзакции не является сообщением, которое мы хотим вроде транзакции, RocketMQ его.

Говоря об этом, я хочу поговорить о том, что когда дело доходит до этого «Ровно раз», не очень ясного ученика легко неправильно понять.

Мы знаем, что есть три типа надежности сообщения, которые являются не более одного раза, ровно один раз и не менее одного раза, Я уже упоминал в предыдущей статье в последовательном вопросе очереди сообщений, что в основном мы используем хотя бы один раз, а затем сотрудничаем с идемпотентность потребительской стороны достигается ровно один раз.

Сообщение расходуется ровно один раз, конечно, чего мы все и добиваемся, но я проанализировал предыдущую статью с разных сторон, и добиться этого в принципе сложно.

И Кафка на самом деле сказал, что он может достичь ровно один раз? Это так пиво? На самом деле это уловка Кафки. Если вы хотите сказать, что он неправ, он действительно прав. Вы должны сказать, что он прав, но Ровно Как только он осознает, это не Ровно Как только вы думаете.

Бывает, что одновременно может существовать только один сценарий, т.Kafka используется как источник сообщений, и после выполнения некоторых операций оно записывается в Kafka.

Так как же он добился ровно один раз? Это идемпотент, передавая уникальный идентификатор, как мы реализовали в бизнесе, а затем записывая его.Если он был записан, он не будет записан, чтобы обеспечить ровно один раз.

такТо, что реализует Kafka, выполняется ровно один раз в конкретном сценарии, а не то, что мы думаем об использовании Kafka для отправки сообщения, тогда это сообщение будет использовано только один раз..

На самом деле это то же самое, что Redis говорит, что он реализовал транзакцию, и это не та транзакция, о которой мы думали.

Поэтому возможности ПО с открытым исходным кодом были развиты, и мы слепо верим в это, поэтому они часто полны остаточной крови или могут быть удовлетворены только в особых сценариях.Не вводите в заблуждение.Нельзя верить поверхностному описанию, и вы должны прочитать его подробно, см. документацию или исходный код.

Но с другой точки зрения это понятно.Как программное обеспечение с открытым исходным кодом, оно должно использоваться большим количеством людей.Я не вру.Мой документ очень понятен, и название не обманывает, правда?

Ведь, например, если вы нажмете на статью с шокирующим хххх заголовком, вам никто не врал, он действительно в шоке.

Вернемся к транзакционному сообщению Кафки, так что это транзакционное сообщение не то сообщение транзакции, которое нам нужно, но это не тема сегодняшнего дня, но я просто скажу это кратко.

Транзакции Kafka играют роль координатора транзакций, а координатор транзакций фактически является частью брокера.

При запуске транзакции производитель инициирует запрос к координатору транзакций, чтобы указать, что транзакция открыта.Координатор транзакций запишет сообщение в специальный лог-журнал транзакций, а затем производитель отправит сообщение, которое он действительно хочет для отправки, здесь Kafka В отличие от обработки RocketMQ, Kafka будет обрабатывать эти транзакционные сообщения как обычные сообщения,Сообщение фильтруется потребителем.

Затем после отправки производитель отправит координатору транзакций запрос на фиксацию или откат, а координатор транзакций выполнит двухэтапную фиксацию в журнале транзакций, а затем напишет сообщение, аналогичное завершению транзакции, всем связанным с транзакциями разделы, так что когда потребитель использует сообщение, он знает, что транзакция завершена, и сообщение может быть выпущено.

Наконец, координатор запишет еще одно сообщение о завершении транзакции в журнал транзакций. На этом транзакция Kafka завершена. Я суммирую процесс с диаграммой на confluent.io.

Наконец

До сих пор мы знали весь процесс сообщения о транзакции RocketMQ и Kakfa. Мы видим, что сообщение о транзакции RocketMQ — это то, что нам нужно. Конечно, если вы используете потоковые вычисления, то сообщение о транзакции Kakfa также является тем, что нам нужно. вы хотите.

Статьи, которые нужно выкладывать с кодом, на самом деле очень неудобны. Публиковать слишком много нехорошо, и боюсь, что будет непонятно. Это действительно сложно. Если вы считаете, что статья хорошая, не забудьте нажать и прочитать Это.


Я да, из немного битов, мы увидим.