идемпотентный выбор и округление
Гарантией семантики, связанной с идемпотентностью, на распределенных платформах является наше вечное стремление создавать безопасные и надежные системы. Как наиболее предпочтительную схему реализации асинхронности и развязки, я часто думаю о том, какое разделение испытывали разработчики Rocket MQ?
Хорошо известно, что реализацию концепции потребления сообщений очередями сообщений можно условно разделить на три ситуации:
- At most once
- At least once
- Exactly once
Перевод:
- Потратьте хотя бы раз
- провести хотя бы раз
- Гарантированно потратить один раз
Очевидно, что если вы используете хотя бы один раз, это неизбежно приведет к потере сообщений; по крайней мере один раз потребление потребует более высоких требований к нашей бизнес-системе, и когда потребление будет хорошим, системе MQ придется нести большую цену. Rocket MQ выбирает хотя бы раз без колебаний. Смелое предоставление идемпотентной гарантии разработчикам не только отражает беспомощность автора по поводу противоречия между производительностью и функциями MQ, но и отражает доверие большинства разработчиков.
Обзор текущего состояния потребления
Хотя приведенный выше аргумент является объективным и верным, он неизбежно пессимистичен.Согласно приведенному выше пониманию, наша бизнес-система должна полагаться на них, но мы всегда должны остерегаться их, потому что, если не быть осторожным, могут возникнуть ошибки. Существование любви и страха.
Читая это, автор, кажется, описывает его как непослушного ребенка, но на самом деле это немного тяжеловато, потому что, согласно моему пониманию чтения исходного кода, бизнес-система не является ненормальной, а физическая операционная среда, в которой находится MQ, относительно здоров. Трудно повторять употребление несколько раз.
Идемпотентность RocketMQ часто нарушается ненормальной логикой бизнес-системы, сети или неопределенной операционной среды. В подавляющем большинстве случаев нет никаких сомнений в том, что он все еще хороший мальчик.
Согласно нашему простому пониманию системы сообщений, процесс потребления сообщений удовлетворяет следующим правилам:
- Хотя потребление не будет строго соответствовать порядку доставки, но в целом сохраняется тенденция «первым пришел, первым вышел».
- Сообщение должно точно фиксировать текущее состояние потребления
- Всегда есть роль, ответственная за статистику и постоянные компенсации потребления.
Эмпирически посмотрим, какие усилия приложили авторы для плавного потребления и управления прогрессом.
⚠️Примечание: модель последовательного потребления Rocket MQ строго гарантирует порядок.
OffsetStore
После того, как сообщение использовано, оно теряет право оставаться в очереди ProcessQueue.ProcessQueue удалит сообщение и вернет текущее минимальное смещение в таблицу хода выполнения сообщения. Несложно представить, что если этот прогресс потребления не сохраняется, то перезапускать потребление каждый раз при его запуске заведомо недопустимо, но как сохранять и где сохранять?
Rocket MQ поддерживает два режима подписки:
-
Кластерный режим потребления: режим потребления по умолчанию, все сообщения должны быть обработаны только один раз любым потребителем в одной группе, и все разделяют смещение потребления в теме подписки.
-
Режим широковещательного потребления: поведение потребления каждого потребителя полностью независимо, и все сообщения в теме подписки должны потребляться всеми потребителями в группе.
Согласно характеристикам двух моделей потребления, легко обнаружить, что их нелегко обобщить.Идеальная реализация состоит в том, чтобы разделить ее на две стратегии: одну централизованную для управления брокерами и одну децентрализованную для управления потребителями. За сопутствующие вопросы отвечает интерфейс OffsetStore, и исходный код подтверждает нашу догадку.Давайте посмотрим на определение интерфейса OffsetStore:
public interface OffsetStore {
/**
* 从消息进度存储文件加载消息进度到内存
*/
void load() throws MQClientException;
/**
* Get offset from local storage
* @return The fetched offset
*/
long readOffset(MessageQueue mq, ReadOffsetType type);
/**
* Remove offset
*/
void removeOffset(MessageQueue mq);
Map<MessageQueue, Long> cloneOffsetTable(String topic);
/**
* 更新内存中的消息进度
* Update the offset,store it in memory
*/
void updateOffset(MessageQueue mq, long offset, boolean increaseOnly);
/**
* 保留所有偏移量,可能在本地存储或远程服务器
* Persist all offsets,may be in local storage or remote name server
*/
void persistAll(Set<MessageQueue> mqs);
/**
* 保留指定消息队列偏移量,可能在本地存储或远程服务器
* Persist the offset,may be in local storage or remote name server
*/
void persist(MessageQueue mq);
/**
* 更新存储在 Broker 端的消息消费进度,使用集群模式
*/
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
}
По сравнению с исходным кодом расположение методов было изменено мной на обратное, и я поместил те, на которые нужно обратить внимание, в конце.
⚠️Примечание: Если нет опыта чтения исходников Rocket MQ, то ProcessQueue кажется немного неудобным, можно понимать ta как носитель сообщения на стороне Consumer, и некий перехват физической очереди. Автор определяет ta следующим образом: Снимок потребления очереди
LocalFileOffsetStore
В широковещательном режиме ход сообщения сохраняется на стороне потребителя, а файлы помещаются в настраиваемый фиксированный каталог в соответствии с соглашением.Путь к файлу следующий:
public class LocalFileOffsetStore implements OffsetStore {
/**
* 存储文件夹路径可定制
*/
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets"
);
/**
* 构造方法拼接出了文件的完整路径
*/
public LocalFileOffsetStore(MQClientInstance mQClientFactory,
String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
}
}
По умолчанию папка ".rocketmq_offsets" создается на один уровень ниже пути пользователя. Обратите внимание, что здесь есть детали. Папка начинается с ".", который является скрытым файлом в системе Linux. Вам нужно добавить - параметр, который необходимо отобразить. Для простоты понимания на изображении ниже показан путь к папке и путь к постоянному файлу Offset.
В широковещательном режиме Consumer#start() вызовет OffsetStore.load() для загрузки прогресса потребления.Принцип заключается в том, чтобы прочитать соответствующий файл после объединения полного пути к файлу в соответствии с соглашением, а затем сериализовать его в OffsetSerializeWrapper. объект:
public class OffsetSerializeWrapper extends RemotingSerializable {
/* 详细记录每个队列当前消费进度 */
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable
= new ConcurrentHashMap<>();
}
Предположим, у нас есть тема подписки на услугу «SMS_prod» для отправки коротких сообщений, тогда сформированный Json выглядит следующим образом: Обратите внимание, что свойство offsetTable также является Json, а ключ — это объект MessageQueue, а значение — число, представляющее смещение .
{
"offsetTable": {
{
"topic": "SMS_prod",
"brokerName": "broker0"
"queueId": 0
}: 100,
{
"topic": "SMS_prod",
"brokerName": "broker0"
"queueId": 1
}: 100,
}
}
Поскольку вы можете загружать ключевую информацию в указанный файл, естественно, существует соответствующий механизм, отвечающий за запись. Помните упомянутый выше метод persistAll?
public void persistAll(Set<MessageQueue> mqs) {
/* 构造OffsetSerializeWrapper对象 */
OffsetSerializeWrapper offsetSerializeWrapper =
new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
/* 将offsetSerializeWrapper对象序列化 */
String jsonString = offsetSerializeWrapper.toJson(true);
/* 将序列化好的offsetSerializeWrapper写入文件 */
MixAll.string2File(jsonString, this.storePath);
}
Операции, связанные с offsets.json, инкапсулированы в классе инструментов MixAll:
- MixAll.file2String: прочитать файл
- MixAll.string2File: запись сериализованного объекта в файл.
RemoteBrokerOffsetStore
Поскольку смещение поддерживается на стороне посредника, метод загрузки в этой реализации является просто оператором. Метод построения не требует вычисления пути к файлу и является особенно простым, а свойства offsetTable у них одинаковые. Давайте сосредоточимся на том, как сохранить прогресс потребления сообщений в режиме потребления кластера.
public void persistAll(Set<MessageQueue> mqs) {
HashSet<MessageQueue> unusedMQ = new HashSet<>();
for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
this.updateConsumeOffsetToBroker(mq, offset.get());
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
}
}
}
Без дальнейших исследований мы сможем найти как минимум два отличия:
- Степень детализации отличается: широковещательный режим напрямую сохраняет сразу всю таблицу offsetTable, а кластерный режим уточняется до начального уровня.
- Методы вызова различаются: широковещательный режим заключается в непосредственном вызове внутренней JVM для записи файла, тогда как кластерный режим требует участия вызовов RPC.
Здесь необходимо подчеркнуть, что файлы offset.json, сгенерированные этими двумя, также отличаются, я проанализирую его ниже, а также покажу вам процесс RPC.
RPC调用栈:
RemoteBrokerOffsetStore#persistAll()
-> RemoteBrokerOffsetStore#updateConsumeOffsetToBroker()
组装好RPC请求头UpdateConsumerOffsetRequestHeader对象
-> MQClientAPIImpl#updateConsumerOffsetOneway()
组装好RPC请求对象RemotingCommand
-> NettyRemotingClient#invokeSync()
发起RPC调用
更新偏移量的RPC调用类型是RequestCode.UPDATE_CONSUMER_OFFSET
顺着这个枚举来看看Broker端的相关处理:
ConsumerManageProcessor.updateConsumerOffset()
-> ConsumerOffsetManager.commitOffset()
Отслеживание исходного кода показало, что на самом деле каждый раз, когда Потребитель делает вызов RPC, чтобы сообщить о своем прогрессе потребления, Брокер не сохраняет его сразу после его получения, а напрямую обновляет его в памяти.
private void commitOffset(String clientHost, String key,
int queueId, long offset) {
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = offsetTable.get(key);
if (Objects.isNull(map)) {
map = new ConcurrentHashMap<>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
}
}
TOPIC_GROUP_SEPARATOR — это определенная константа: «@», мы упоминали ранее, что два jsons немного отличаются, ключ offsetTable становится объединенной строкой, левая сторона строки — TopicName, правая сторона — ConsumeGroupName с @ в средняя символическая ссылка. Для удобства понимания я также показываю этот json:
/**
* 注意一下这个key:%RETRY%ConsumeGroup
* 笔者后期会有专门文章分析
*/
{
"offsetTable": {
"Topic@ConsumeGroup":{
0: 38,
1: 37,
2: 50,
3: 10
},
"%RETRY%ConsumeGroup": {
0: 0
}
}
}
Упорство
Между этими двумя механизмами сохранения файлов нет большой разницы: запланированная задача запускается или потребитель запускается вручную перед обычным завершением работы и выполнением shotdown().
Задача синхронизации в широковещательном режиме определена в MQClientInstance, и объект MQClientInstance запускает задачу синхронизации при вызове start() после создания экземпляра. Конфигурация поддержки временного интервала запланированной задачи по умолчанию составляет 5000 мс, а выполнение начинается с задержкой в 10000 мс.
public void start() throws MQClientException {
this.scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
},
1000 * 10,
this.clientConfig.getPersistConsumerOffsetInterval(),
TimeUnit.MILLISECONDS
);
}
В определении задачи синхронизации в кластерном режиме BrokerController объект BrokerController будет выполнять ряд действий по инициализации после создания экземпляра, а функция initialize() запускает задачу синхронизации. Конфигурация поддержки временного интервала запланированной задачи по умолчанию составляет 5000 мс, а выполнение начинается с задержкой в 10000 мс.
public boolean initialize() throws CloneNotSupportedException {
this.scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
},
1000 * 10,
this.brokerConfig.getFlushConsumerOffsetInterval(),
TimeUnit.MILLISECONDS
);
}
Повторное потребление
После столь долгого анализа принципа я хочу подчеркнуть, что причина повторного потребления в рамках нормального использования должна быть связана с отчетами о смещении и постоянством.
- В процессе потребления кластера потребитель неожиданно аварийно завершает работу, и о смещении не сообщается, что приводит к повторному потреблению.
- Брокер неожиданно завершает работу во время использования кластера, а смещение не сохраняет последнее смещение, что приводит к повторному потреблению.
- Во время широковещательного потребления Потребитель неожиданно дает сбой, и смещение не сохраняется в локальном файле, что приводит к повторному потреблению.
- Файл offset.json случайно повреждается или удаляется, а прогресс теряется, что приводит к повторному потреблению.
- Файл offset.json был изменен, и прогресс неточен, что приводит к повторному потреблению.
Другой причиной является то, что разработчик вернул неверный флаг ACK, из-за чего Rocket ошибочно оценил, что потребление не удалось, что привело к повторному потреблению, вызванному логикой повтора.
Если эта статья была вам полезна, попросите лайк👍
Последнее предложение видят только Ди Лиеба и Ву Янзу.