1. О Сеате
Недавно я написал анализ Fescar, промежуточного программного обеспечения распределенных транзакций. В течение нескольких дней команда Fescar обновила свой бренд и назвала его Seata (Simpe Extensible Autonomous Transaction Architecture), полное название Fast & Easy Commit And Rollback. Видно, что название Fescar более ограничено Commit и Rollback, в то время как новый бренд Seata нацелен на создание универсального решения для распределенных транзакций. После смены названия у меня больше уверенности в его дальнейшем развитии.
Вот общий отзыв о всей модели процессов Seata:
- ТМ: Инициатор транзакции. Используется, чтобы сообщить TC, что глобальная транзакция запускается, фиксируется и откатывается.
- RM: Для конкретных ресурсов транзакций каждый RM будет зарегистрирован в TC как транзакция филиала.
- ТК: Координатор сделки. Его также можно рассматривать как Fescar-servr, который используется для получения регистрации, фиксации и отката наших транзакций.
В предыдущей статье есть общее введение ко всей роли, а в этой статье я сосредоточусь на основной роли TC, которая является координатором транзакций.
2.Transcation Coordinator
Почему раньше подчеркивалось, что TC является ядром? Это потому, что роль ТС подобна богу, контролирующему РМ и ТМ всех живых существ. Если ТС плохо работает, то раз есть небольшая проблема с РМ и ТМ, будет бардак. Так что если вы хотите понять Seata, вы должны понять его TC.
Итак, какими способностями должен обладать хороший координатор транзакций? Думаю должно быть следующее:
- Правильная координация: может правильно координировать, что РМ и ТМ должны делать дальше, что следует делать неправильно, а что следует делать правильно.
- Высокая доступность: координатор транзакций очень важен в распределенных транзакциях.Если высокая доступность не может быть гарантирована, то в его существовании нет необходимости.
- Высокая производительность: производительность координатора транзакций должна быть высокой.Если производительность координатора транзакций является узким местом, управляемые им RM и TM часто сталкиваются с тайм-аутами, что приводит к частым откатам.
- Высокая масштабируемость: эта функция относится к уровню кода.Если это отличная структура, она должна предоставить пользователям множество пользовательских расширений, таких как регистрация/обнаружение службы, чтение конфигурации и т. д.
Ниже я также пошагово объясню, как Seata реализует вышеуказанные четыре пункта.
2.1 Дизайн Seata-Server
Общая модульная схема Seata-Server показана выше:
- Ядро координатора: модуль внизу представляет собой основной код координатора транзакций, который в основном используется для работы с логикой координации транзакций, например, следует ли фиксировать, откатывать и другие действия по координации.
- Магазин: модуль хранения используется для сохранения наших данных, чтобы предотвратить потерю данных при перезапуске или простое.
- Обнаружение: модуль регистрации/обнаружения службы, используемый для предоставления адреса сервера нашему клиенту.
- Конфигурация: используется для хранения и поиска конфигурации нашего сервера.
- Блокировка: модуль блокировки, используемый для обеспечения функции глобальной блокировки для Seata.
- Rpc: используется для связи с другими терминалами.
- HA-Cluster: Кластер высокой доступности, исходный код еще не открыт. Обеспечивает надежную высокую доступность для Seata.
2.2 Discover
Прежде всего, давайте поговорим о базовом модуле Discover, также известном как модуль регистрации/обнаружения службы. После того, как мы запустим Seata-Sever, нам нужно открыть свой адрес другим пользователям, поэтому нам понадобится помощь нашего модуля.
Этот модуль имеет основной интерфейс RegistryService, как показано выше:- register: используется сервером для регистрации службы.
- unregister: используется сервером, обычно вызывается в обработчике выключения JVM и ShutdownHook.
- подписка: используется клиентом для регистрации и отслеживания событий для отслеживания изменений адресов.
- unsubscribe: используется клиентом для отмены регистрации событий прослушивания.
- Loop: используется клиентом для поиска списка адресов службы по ключу.
- close: может использоваться для закрытия ресурса Register.
Если вам нужно добавить собственную регистрацию/обнаружение службы, вы можете реализовать этот интерфейс. На сегодняшний день, благодаря постоянному развитию сообщества, было зарегистрировано/обнаружено четыре службы, а именно: redis, zk, nacos, eruka. Ниже приводится краткое введение в реализацию Nacos:
2.2.1 интерфейс регистрации:
Шаг 1: проверьте, является ли адрес законным
Шаг 2: Получите экземпляр Name Nacos, а затем зарегистрируйте адрес для текущего имени кластера.
Интерфейс отмены регистрации аналогичен и не будет здесь подробно описываться.
2.2.2 интерфейс поиска:
шаг 1: Получить текущее имя clusterName
Шаг 2: Определите, был ли получен текущий кластер, и, если он был получен, возьмите его с карты.
Шаг 3: Получите адресные данные от Nacos и преобразуйте их в то, что нам нужно.
Шаг 4: Зарегистрируйте слушателя нашего события в Nacos.
2.2.3 интерфейс подписки
Этот интерфейс относительно прост и состоит из двух шагов:Шаг 1: Добавьте clstuer и слушателя на карту.
Шаг 2: Зарегистрируйтесь в Nacos.
2.3 Config
Модуль конфигурации также является относительно базовым и относительно простым модулем. Нам нужно настроить некоторые общие параметры, такие как: количество выбранных потоков Netty, количество рабочих потоков, максимально допустимый сеанс и т. д. Конечно, эти параметры имеют свои собственные настройки по умолчанию в Seata.
Точно так же в Seata также предоставляется конфигурация интерфейса, позволяющая настроить, где нам нужно получить конфигурацию:
- getInt/Long/Boolean/Config(): получить соответствующее значение через dataId.
- putConfig: используется для добавления конфигурации.
- removeConfig: удалить конфигурацию.
- добавить/удалить/получить ConfigListener: добавить/удалить/получить прослушиватели конфигурации, обычно используемые для отслеживания изменений конфигурации.
Пока есть четыре способа получения Config: File (получение файла), Nacos, Apollo, ZK. В Seata вам сначала нужно настроить реестр.conf, чтобы настроить тип conf. Реализация conf относительно проста, и здесь нет глубокого анализа.
2.4 Store
Реализация уровня хранения имеет решающее значение для высокой производительности и надежности Seata. Если уровень хранения не реализован должным образом, в случае простоя данные, которые обрабатываются распределенными транзакциями в TC, будут потеряны.Поскольку используются распределенные транзакции, он не должен допускать потери. Если уровень хранения реализован хорошо, но его производительность имеет большие проблемы, RM может часто выполнять откат, поэтому он вообще не справляется со сценариями с высокой степенью параллелизма.
В Seata хранилище файлов предоставляется по умолчанию. Далее мы определяем данные, которые мы храним, как Session, а глобальные данные транзакции, созданные нашим TM, называются GloabSession, а транзакция филиала, созданная RM, называется BranchSession. GloabSession может иметь несколько Филиалы. Наша цель - хранить так много сессий.
В коде FileTransactionStoreManager#writeSession:
Приведенный выше код в основном разделен на следующие шаги:
- Шаг 1: Создайте TransactionWriteFuture.
- Шаг 2: добавьте этот futureRequest в LinkedBlockingQueue. Зачем нужно кидать все данные в очередь? Конечно, здесь также можно использовать блокировки, и еще один RocketMQ с открытым исходным кодом от Ali использует блокировки. Будь то очередь или блокировка, их цель — обеспечить однопоточную запись, почему это так? Некоторые люди объяснят, что необходимо обеспечить последовательную запись, чтобы скорость была очень быстрой.Это понимание неверно.Наш FileChannel на самом деле потокобезопасен и уже может гарантировать последовательную запись. Обеспечение однопоточной записи на самом деле позволяет нашей логике записи быть однопоточной, потому что некоторые файлы могут быть заполнены или записывать места записи данных и другую логику.Конечно, эта логика может быть активно заблокирована, но для простоты и удобства это наиболее подходит для прямой блокировки всей логики записи.
- Шаг 3: Вызовите future.get и дождитесь уведомления о завершении нашей логики записи данных.
После того, как мы отправим данные в очередь, нам нужно их использовать следующим образом.Код выглядит следующим образом:
Здесь WriteDataFileRunnable() отправляется в наш пул потоков Метод run() этого Runnable выглядит следующим образом:
Делится на следующие этапы:шаг 1: определить, следует ли останавливаться, если остановка верна, вернуть ноль.
Шаг 2: Получите данные из нашей очереди.
Шаг 3: Определите, истекло ли время ожидания для будущего. Если время ожидания истекло, установите для результата значение false. В это время метод get() нашего производителя будет заблокирован.
Шаг 4: Записываем наши данные в файл. В это время данные все еще находятся в слое pageCahce и не были сброшены на диск. Если запись прошла успешно, то определить, следует ли выполнять операцию сброса в соответствии с условиями.
Шаг 5: Когда количество операций записи достигает определенного значения или когда время записи достигает определенного значения, нам нужно сохранить текущий файл как файл истории, удалить предыдущий файл истории, а затем создать новый файл. Этот шаг предназначен для предотвращения бесконечного роста наших файлов и большого количества недействительных данных, которые тратят ресурсы диска впустую.
У нас есть следующий код в нашем writeDataFile:
Шаг 1: Сначала получите наш ByteBuffer, если он превышает максимальный BufferSize цикла, создайте новый напрямую, в противном случае используйте наш кэшированный буфер. Этот шаг может значительно уменьшить GC.
Шаг 2: Затем добавьте данные в ByteBuffer.
Шаг 3: Наконец, запишите ByteBuffer в наш файловый канал, который будет повторен трижды. В настоящее время данные все еще находятся на уровне pageCache. Затронутые двумя аспектами, ОС имеет собственную стратегию обновления, но эту бизнес-программу нельзя контролировать. Чтобы предотвратить потерю большого количества данных, вызванную такими событиями, как простои , бизнесу необходимо самостоятельно контролировать сброс. Вот код для сброса:
Условием сброса здесь является запись определенного количества или время записи превышает определенное время, поэтому будет небольшая проблема.Если произойдет сбой питания, то в pageCache могут быть данные, которые не были сброшены, что приведет к небольшой потере данных. В настоящее время режим синхронизации не поддерживается, то есть каждые данные необходимо сбрасывать, что может обеспечить размещение каждого сообщения на диске, но производительность также сильно пострадает.Конечно, поддержка будет и дальше развиваться в будущем.
Основной процесс нашего магазина в основном состоит из вышеперечисленных методов, конечно, есть и некоторые, такие как реконструкция сеанса и т. Д. Они относительно просты, и читатели могут прочитать их самостоятельно.
2.5 Lock
Все мы знаем, что уровень изоляции базы данных в основном достигается за счет блокировок, и тот же перераспределенный фреймворк транзакций Seata должен использовать блокировки для достижения уровня изоляции. Как правило, в базе данных существует четыре уровня изоляции базы данных: незафиксированное чтение, зафиксированное чтение, повторяемое чтение и сериализованное. В Seata уровень изоляции записи может быть гарантированно зафиксирован, в то время как уровень изоляции чтения, как правило, не зафиксирован, но он предоставляет средства для достижения изоляции зафиксированного чтения.
Модуль блокировки также является основным модулем Seata для реализации уровня изоляции. В модуле Lock предусмотрен интерфейс для управления нашими замками:
Есть три метода:
- AcquireLock: используется для блокировки нашего BranchSession.Хотя это сеанс переданной транзакции ветки, он фактически блокирует ресурсы транзакции ветки и успешно возвращает значение true.
- isLockable: запрос, был ли он заблокирован в соответствии с идентификатором транзакции, идентификатором ресурса и заблокированным ключом.
- cleanAllLocks: очистить все блокировки. Для блокировок мы можем реализовать их локально, или мы можем использовать Redis или MySQL, чтобы помочь нам реализовать их. Официальное значение по умолчанию обеспечивает реализацию локальной глобальной блокировки:Есть две константы, о которых следует помнить при реализации локальных блокировок:
- BUCKET_PER_TABLE: используется для определения количества сегментов для каждой таблицы, чтобы уменьшить конкуренцию при последующей блокировке одной и той же таблицы.
- LOCK_MAP: Эта карта очень сложна с точки зрения определения. Существует много слоев карты внутри и снаружи. Вот таблица для подробного объяснения:
слои | key | value |
---|---|---|
1-LOCK_MAP | идентификатор ресурса (jdbcUrl) | dbLockMap |
2- dbLockMap | tableName (имя таблицы) | tableLockMap |
3- tableLockMap | PK.hashcode%Bucket (хэш-код%bucket значения первичного ключа) | bucketLockMap |
4- bucketLockMap | PK | trascationId |
Видно, что фактическая блокировка находится в карте BucketLockMap. Конкретный метод блокировки здесь относительно прост и не будет подробно останавливаться на нем, главное постепенно найти ведроLockMap, а затем вставить текущий trascationId. Если первичный ключ в настоящее время имеет TranscationId, затем сравните, является ли он самим собой, если нет, то блокировка не работает.
2.6 Rpc
Одним из ключей к обеспечению высокой производительности Seata также является использование Netty в качестве инфраструктуры RPC.Модель потоков с конфигурацией по умолчанию показана на следующем рисунке:
Если принята базовая конфигурация по умолчанию, будет поток Acceptor для обработки клиентского соединения, и будут NIO-потоки с количеством процессоров * 2. В этом потоке не будет тяжелых для бизнеса вещей, а только некоторые более высокие скорости, такие как кодеки, события сердцебиения и регистрации TM. Некоторые трудоемкие бизнес-операции будут переданы в пул бизнес-потоков. По умолчанию пул бизнес-потоков настроен на минимум 100 потоков и максимум 500.
Здесь следует упомянуть механизм сердцебиения Seata, который выполняется с помощью Netty IdleStateHandler следующим образом:
На стороне Севера нет максимального времени простоя для записи и максимального времени простоя для чтения, по умолчанию 15с, если превысит 15с, то линк будет разорван, а ресурсы закрыты.
Шаг 1: Определите, является ли это событием обнаружения простоя чтения.
step2: Если это так, отключите ссылку и закройте ресурс.
2.7 HA-Cluster
В настоящее время HA-Cluster официально не анонсирован, но с помощью некоторого другого промежуточного программного обеспечения и некоторых официальных раскрытий HA-Cluster может быть спроектирован следующим образом:
Конкретный процесс выглядит следующим образом:
Шаг 1. Когда клиент публикует информацию, он гарантирует, что одна и та же транзакция выполняется на одном и том же главном устройстве в соответствии с идентификатором транзакции, и обеспечивает производительность параллельной обработки за счет горизонтального расширения нескольких главных устройств.
Шаг 2: На стороне сервера у ведущего есть несколько ведомых, и данные в ведущем синхронизируются с ведомыми почти в реальном времени, гарантируя, что, когда ведущее устройство выйдет из строя, другие ведомые устройства все еще могут быть использованы.
Разумеется, все вышеперечисленное является догадками, а конкретный дизайн и реализацию придется подождать до версии 0.5. В настоящее время существует Go-версия Seata-Server, также подаренная Seata (все еще в процессе), которая реализует согласованность реплик через raft, а другие детали не слишком ясны.
2.8 Metrics
Этот модуль также является модулем, о конкретной реализации которого не было объявлено. Конечно, он может предоставлять подключаемый порт для доступа к другим сторонним метрикам. Недавно Apache Skywalking обсуждает с командой Seata, как получить к нему доступ.
3.Coordinator Core
Выше мы говорили о множестве основных серверных модулей.Я думаю, что у всех есть обзор реализации Seata.Далее я объясню, как реализована конкретная логика координатора транзакций, чтобы вы могли лучше понять реализацию Seata. .
3.1 Процесс запуска
Метод запуска имеет основной метод в классе Server, который определяет наш процесс запуска:
Шаг 1: Создайте RpcServer, который содержит операции нашей сети и реализует сервер с Netty.
Шаг 2: Проанализируйте номер порта и адрес файла.
Шаг 3: Инициализируйте SessionHoler, самое главное — восстановить наши данные в папке dataDir и пересобрать нашу сессию.
Шаг 4: Создайте CoordinatorDinator, который также является логическим основным кодом нашего координатора транзакций, а затем инициализируйте его.Внутренняя логика инициализации создаст четыре задачи синхронизации:
- retryRollbacking: задача синхронизации повторных откатов используется для повторных попыток неудачных откатов, которые выполняются каждые 5 мс.
- retryCommitting: повторите задачу синхронизации фиксации, которая используется для повторения этих неудачных коммитов, выполняемых каждые 5 мс.
- asyncCommitting: задача синхронизации асинхронной фиксации, используемая для выполнения асинхронной фиксации один раз каждые 10 мс.
- timeoutCheck: определение времени ожидания задачи, используемое для обнаружения задач с истекшим временем ожидания, а затем выполнение логики времени ожидания, которая выполняется каждые 2 мс.
Шаг 5: Инициализация UUIDGenerator. Это также базовый класс для генерации различных идентификаторов (transcationId, branchId).
Шаг 6: Установите локальный IP-адрес и порт прослушивания на XID, инициализируйте rpcServer и дождитесь подключения клиента.
Процесс запуска относительно прост.Ниже я расскажу, как Seata обрабатывает некоторые общие бизнес-логики в структуре распределенных транзакций.
3.2 Глобальная транзакция Начало-Открытие
Отправной точкой распределенной транзакции должно быть открытие глобальной транзакции.Для начала посмотрим, как реализована глобальная транзакция Seata:
Шаг 1: Создайте GloabSession в соответствии с идентификатором приложения, группой транзакций, именем и временем ожидания.Это также упоминалось ранее о том, что такое он и branchSession.
шаг 2: добавьте к нему RootSessionManager для прослушивания некоторых событий.В настоящее время в Seata есть четыре типа прослушивателей (здесь следует объяснить, что все sessionManager реализуют SessionLifecycleListener):
- ROOT_SESSION_MANAGER: самый полный и самый большой, со всеми сессиями.
- ASYNC_COMMITTING_SESSION_MANAGER: используется для управления сеансами, которым необходимо выполнять асинхронные фиксации.
- RETRY_COMMITTING_SESSION_MANAGER: сеанс для управления повторными фиксациями.
- RETRY_ROLLBACKING_SESSION_MANAGER: сеанс, используемый для управления повторными откатами. Поскольку здесь открывается транзакция, другим SessionManager не нужно беспокоиться, нам нужно только добавить RootSessionManager.
Шаг 3: Открыть глобальную сессию
Этот шаг изменит статус на «Начало», запишет время начала и вызовет метод мониторинга onBegin в RootSessionManager, чтобы сохранить сеанс на карте и записать его в наш файл.
Шаг 4: Наконец, верните XID. Этот XID состоит из ip+port+transactionId, что очень важно. Когда TM запрашивает его, ему необходимо передать идентификатор RM, а RM решает, к какому серверу следует обращаться через ХИД.
3.3 BranchRegister - регистрация транзакций филиала
Когда наша глобальная транзакция открывается в TM, наша транзакция филиала RM также должна быть зарегистрирована в нашей глобальной транзакции.Вот как это обрабатывается:
Шаг 1: Получите и проверьте, открыта ли глобальная транзакция через transactionId.
Шаг 2: Создайте новую транзакцию ветки, которая является нашей BranchSession.
Шаг 3: Добавьте глобальную блокировку в транзакцию ветки, здесь используется логика нашего модуля блокировки.
Шаг 4: Добавьте BranchSession, в основном, добавив его в объект globalSession и записав его в наш файл.
Шаг 5: Возвратите branchId, этот идентификатор также очень важен, нам нужно использовать его, чтобы позже откатить нашу транзакцию или обновить статус нашей транзакции ветки.
После того, как транзакция филиала зарегистрирована, также необходимо сообщить, был ли последующий статус транзакции филиала успешным или неудачным.В настоящее время сервер просто сохраняет запись.Цель отчета состоит в том, что даже если транзакция филиала не удалась , если TM по-прежнему настаивает на отправке глобальной транзакции, то при обходе и отправке транзакции ветвления не нужно отправлять транзакцию ветвления, которая не удалась.
3.4 GlobalCommit — глобальная фиксация
Когда выполнение нашей ветки транзакции завершится, наступает очередь нашего менеджера TM-транзакций решать, коммитить или откатываться, если коммит, то дело пойдет по следующей логике:
Шаг 1: Сначала найдите наш файл globalSession. Если он докажет, что это было зафиксировано для Null, то прямая идемпотентная операция возвращает успех.
Шаг 2: Закройте наш GloabSession, чтобы предотвратить повторное появление новой ветки.
Шаг 3: Если статус равен «Начало», долгосрочное доказательство не было отправлено, и изменение его статуса на «Подтверждение» означает отправку.
Шаг 4: Определите, может ли он быть отправлен асинхронно.В настоящее время асинхронно может быть отправлен только режим AT, потому что это делается Undolog. И MT, и TCC должны синхронно обрабатывать представленный код.
Шаг 5: Если это асинхронная отправка, поместите ее непосредственно в наш ASYNC_COMMITTING_SESSION_MANAGER и позвольте ему выполнить наш шаг 6 асинхронно в фоновом потоке.Если это синхронно, то непосредственно выполните наш шаг 6.
Шаг 6: Перейдите через наш BranchSession для отправки, если транзакция ветки не удалась, определите, следует ли повторить попытку в соответствии с различными условиями, асинхронный не должен повторять попытку, потому что он находится в самом диспетчере, пока он не будет успешным, он не будет Всегда повторять попытку, если он отправляется синхронно, он будет помещен в очередь асинхронных повторных попыток для повторной попытки.
3.5 GlobalRollback — глобальный откат
Если наша ТМ решит откатиться глобально, она пойдет по следующей логике:
Эта логика в основном такая же, как и процесс подчинения, который можно рассматривать как обратную ему сторону, и здесь он обсуждаться не будет.
4. Резюме
Наконец, в начале резюме мы выдвинули 4 ключевых момента распределенных транзакций, как их решает Seata:
- Правильная координация: несколько правильных повторных попыток через фоновые запланированные задачи, и платформа мониторинга будет запущена в будущем, и, возможно, можно будет выполнить откат вручную.
- Высокая доступность: Высокая доступность гарантируется благодаря HA-Cluster.
- Высокая производительность: последовательная запись файлов, RPC реализован через netty, Seata может масштабироваться горизонтально в будущем для повышения производительности обработки.
- Высокая масштабируемость: Предоставляет места, которые пользователи могут свободно использовать, такие как конфигурация, обнаружение и регистрация служб, глобальные блокировки и многое другое.
Наконец, я надеюсь, что из этой статьи каждый сможет понять основные принципы проектирования Seata-Server.Конечно, вы также можете представить, как спроектировать сервер, реализующий распределенную транзакцию, самостоятельно?
адрес гитхаба места:GitHub.com/цвет ах/цвет ах…
Наконец, эта статья была включена в JGrowing-Distributed Transactions, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом.Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе.Адрес github:GitHub.com/Java растет…Пожалуйста, дайте мне маленькую звезду.
Если вы считаете, что эта статья полезна для вас, то ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O: