RocketMQ широко используется в нашем проекте и столкнулся со многими проблемами в процессе его использования. Например, без изоляции нескольких сред, когда несколько версий разрабатываются и тестируются одновременно, взаимные помехи могут быть серьезными. Доставка в RocketMQ может завершиться неудачно, что приведет к потере сообщений. Кроме того, версия RocketMQ с открытым исходным кодом не поддерживает отложенные сообщения с произвольной точностью времени, а только определенный уровень. В процессе использования мы сделали несколько целевых оптимизаций и разобрали эту статью.
Прочитав эту статью, вы освоите эти знания
- Попытка решения для изоляции нескольких сред RocketMQ
- «Надежная» схема доставки сообщений на базе RocksDB
- Реализация отложенных сообщений с произвольной задержкой на основе RocksDB и RocketMQ
Изоляция нескольких сред RocketMQ
Поскольку у нас есть много функциональных требований, которые будут разрабатываться и тестироваться параллельно, существует целых три или четыре набора сред разработки и тестирования Предположим теперь, что у нас одновременно разрабатываются три версии для одной и той же темы, сообщения, сгенерированные средой разработки dev1, могут быть потреблением среды разработки dev3, код потребительской части двух сред может быть несогласованным, в результате чего невозможно завершить тестирование этой части функции.В этом случае разработчик жалкий и часто должен выйти в автономный режим и отключить потребительский конец других сред, чтобы продолжить Тест разработки, как показано на следующем рисунке.
Чтобы решить эту проблему, я хотел вначале поработать над этой темой.Изменив сторону производителя, тема каждой среды единообразно добавляется с суффиксом среды, так что тема_ABC станет темой_ABC_dev1 в среде dev1. Этот метод также может быть решен в теории, но он требует создания большего количества тем, стоимость относительно высока, а количество изменений велико.
Позднее было принято решение выделить независимую очередь RocketMQ для каждой среды.Для простоты каждой среде выделяется только одна очередь, как показано ниже.
Различение по переменным среды
- На производственной стороне: среда dev1 доставляется в очередь RocketMQ 0, среда dev2 доставляется в очередь 1 и т. д.
- Со стороны потребителя: среда dev1 извлекает сообщения только из очереди RocketMQ № 0, среда dev2 извлекает сообщения только из очереди № 1 и т. д.
Реализация производственной стороны
Доставка сообщений RocketMQ предоставляет интерфейс MessageQueueSelector для настройки селектора очереди сообщений и указания очереди, в которую должно быть доставлено сообщение.Его определение выглядит следующим образом.
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
Параметр mqs — это все доступные очереди для текущей темы, а возвращаемое значение — это очередь, которая будет доставлена на этот раз. Он имеет следующие классы реализации:
- SelectMessageQueueByHash: используйте абсолютное значение хэш-кода параметра msg, чтобы получить модуль размера очереди.
- SelectMessageQueueByRandom: вызовите метод Random.nextInt для получения случайного числа в диапазоне 0~mqs.size()-1.
- SelectMessageQueueByMachineRoom: реализация пуста
Для нашего сценария обработка здесь упрощена, а очередь напрямую сопоставляется в соответствии с номером среды.Пример кода на производственной стороне показан ниже.
DefaultMQProducer producer = // ...;
final int envIndex = getEnvIndex();
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(envIndex-1);
}
}, envIndex);
Таким образом, среда dev1 сопоставляется с 0-й очередью, а среда dev3 сопоставляется со 2-й очередью.
Потребительская реализация
Для стороны потребителя RocketMQ определяет интерфейс стратегии AllocateMessageQueueStrategy, который может реализовать, какие очереди может использовать текущий потребитель. Ниже показано определение интерфейса AllocateMessageQueueStrategy.
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup 当前 consumer group
* @param currentCID 当前 consumer id
* @param mqAll 当前 topic 的所有 queue 列表
* @param cidAll 当前 consumer group 下所有的 consumer id set 集合
* @return 根据策略给当前 consumer 分配的 queue 列表
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* 策略算法名
*/
String getName();
}
RocketMQ предоставляет следующие встроенные алгоритмы распределения
- AllocateMessageQueueAveragely: Алгоритм среднего распределения
- AllocateMessageQueueAveragelyByCircle: Распределить по одному в соответствии с кольцом, состоящим из очередей очередей.
- AllocateMachineRoomNearby: Алгоритм, основанный на принципе близости к компьютерному залу.
- AllocateMessageQueueByMachineRoom: на основе алгоритма выделения машинного зала.
- AllocateMessageQueueConsistentHash: на основе согласованного алгоритма хеширования потребитель хешируется в виртуальное кольцо как узел Node.
- AllocateMessageQueueByConfig: основанный на алгоритме выделения конфигурации, он не имеет никакого эффекта и может быть расширен в качестве примера.
Для нашего сценария здесь упрощена обработка, а очередь напрямую сопоставляется по номеру окружения.Код на стороне потребителя выглядит следующим образом.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQConstant.MQ_CONSUMER_GROUP_NAME, null,
new AllocateMessageQueueStrategy() {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
List<MessageQueue> list = new ArrayList<>();
list.add(mqAll.get(envIndex-1));
return list;
}
@Override
public String getName() {
return "env-based";
}
});
Преимущества и недостатки
Реализация этого метода очень проста, количество клиентских изменений очень мало, и нет необходимости модифицировать тему. Если у вас есть фиксированное количество сред, вы можете изменить приведенную выше стратегию, чтобы одна среда могла использовать фиксированное количество очередей, если несколько сред не используют один и тот же интерфейс очередей. Если количество сообщений в среде разработки и тестирования невелико, то не проблема использовать очередь. Производственные онлайн-среды с несколькими компьютерными залами и несколькими средами также могут быть реализованы с использованием аналогичных идей.
На этом введение изоляции нескольких сред завершается.
Потерянные сообщения
RocketMQ сам по себе является сервером. Конечно, будут проблемы, такие как недоступность службы и занятость службы. В частности, все предприятия нашей компании используют RocketMQ. Время от времени будут возникать ненормальные проблемы с доставкой, такие как «система занята, запустить управление потоком на время" и так далее.
Чтобы решить проблему надежности доставки, мы изначально хотели записывать сообщения в постоянное хранилище, такое как базы данных, когда возникали исключения доставки, а затем иметь запланированную задачу для компенсации потребления. Это решение кажется идеальным, но когда RocketMQ недоступен в целом и большое количество сообщений не может быть доставлено, мгновенная нагрузка на запись базы данных будет очень большой, и это решение не было принято.
Позже я подумал об использовании RocksDB, чтобы спасти страну от кривых,
Главный герой
RocksDB – это встроенная система хранения данных типа "ключ-значение", улучшенная Facebook на основе LevelDB, написанной Google Джеффом Дином. В ней реализовано множество оптимизаций, и ее производительность значительно улучшена по сравнению с LevelDB. Хорошо известный базовый механизм хранения TiDB использует RocksDB.RocksDB — это механизм хранения, основанный на дереве LSM. LSM — это аббревиатура дерева слияния с логарифмической структурой. Что касается основного принципа RocksDB, в этой статье я не буду объяснять его. Я напишу его подробно, когда у меня будет возможность.
Механизм повтора на основе RocksDB
Основная логика заключается в том, что после сбоя доставки сообщение записывается в локальное хранилище RocksDB, а затем существует поток для опроса, есть ли сообщение, и если сообщение есть, оно будет повторено. снова сообщение будет перезаписано в RocksDB Процесс Как показано ниже.
С точки зрения реализации ключ, записанный в RocksDB, имеет следующий формат:
expireTime:retryCount:typeName:uuid
Логика генерации expireTime — это текущая метка времени (в секундах) + время задержки доставки Код выглядит следующим образом:
val RETRY_TIME_STEP_ARRAY = arrayOf(
3, 5, 30, 60, 120, 300, 480, 600, 900, 1800
)
val expire = System.currentTimeMillis() / 1000 + (RETRY_TIME_STEP_ARRAY.getOrNull(retryCount) ?: 10)
Когда доставка сообщения в MQ не удалась, напишите его в RocksDB, эта часть кода выглядит следующим образом.
private fun insert(msg: ByteArray, retryCount: Int, typeName: String) {
val key = genKey(retryCount, typeName)
rocksDB.put(mqRetryCFHandler, WRITE_OPTIONS_SYNC, key.toByteArray(), msg)
}
Главный поток отвечает за опрос RocksDB. Если есть запись, она будет извлечена и помещена в очередь блокировки. Основная логика главного потока выглядит следующим образом.
private var lastSeekTime: Long = 0 // 单调递增的值,初始值为当前时间戳(到秒)
private fun loop() {
val now = // 当前时间戳,到秒
if (lastSeekTime > now) { // 如果时钟回拨或者还没到处理时间片,睡眠一段时间
TimeUnit.MILLISECONDS.sleep(400)
return
}
rocksDB.newIterator(mqRetryCFHandler, READ_OPTIONS).use {
it.seek("$lastSeekTime".toByteArray()) // seek 到以 lastSeekTime 开头的 key 的地方
while (it.isValid) {
val value = it.value()
blockingQueue.put(String(it.key()) to value) // 放入一个固定大小的阻塞队列中
it.next()
}
}
++lastSeekTime
}
Рабочий поток отвечает за повторную доставку сообщения, код показан ниже.
private fun startConsume() {
repeat(THREAD_NUM) {
thread {
while (true) {
val list = drain() // 批量从 blockingQueue 中取数据
list.forEach {
try {
val typeName = getTypeName(it.first)
val handler = getHandler(typeName) ?: return@forEach
val success = handler.handler(it.second)
// 如果不成功,则重新写入 RocksDB
if (!success) {
val currentRetryCount = getRetryCountFromKey(it.first) + 1
val maxRetryCount = handler.retryCount
if (currentRetryCount >= RETRY_TIME_STEP_ARRAY_SIZE || currentRetryCount >= maxRetryCount) {
val msgString = getStringFromBytes(it.second)
logger.info("send reach limit, retry count:$currentRetryCount,default count:$RETRY_TIME_STEP_ARRAY_SIZE,custom count:$maxRetryCount, msg: $msgString")
exceptionHandle.handler("retry $currentRetryCount fail,msg:$msgString")
return@forEach
}
insert(it.second, currentRetryCount, typeName)
}
} catch (ex: Throwable) {
exceptionHandle.handler("key: $it.first ,error: ${ex.message}")
Thread.sleep(30)
}
}
}
}
}
}
Благодаря описанному выше преобразованию мы успешно избежали нескольких кратковременных сбоев RocketMQ за последние полгода.Ни одно сообщение не было потеряно, все повторные попытки были успешными, аномалии в данных не возникали.
Преимущества и недостатки
Преимущество этого решения в том, что оно очень легкое, скорость записи и чтения локальной базы данных RocksDB чрезвычайно высока, а в экстремальных сценариях производительность практически не влияет. Но есть и недостаток, который следует учитывать, потому что он не реализован в централизованном хранилище, таком как MySQL, Если проект развернут в контейнере Docker, после перезапуска контейнера эта часть данных повторных попыток все равно будет потеряна. Используя это решение, невозможно гарантировать 100% потерю данных.Учитывая, что сбои mq случаются нечасто, это также возможная мера для достижения баланса между производительностью и потерей данных.
Дизайн произвольного задержанного сообщения на основе RocksDB
После завершения вышеупомянутого решения «надежной доставки» было получено другое решение, использующее RocksDB для реализации отложенной очереди сообщений с произвольной задержкой, и его проектные цели преследуют три цели:
- Поддержка произвольной задержки
- Используйте существующую инфраструктуру
- Он должен иметь возможность накапливаться бесконечно, а эффективность запросов на запись должна быть высокой.
Итак, на основе RocksDB мы реализовали внутренний проект под названием Rock-DMQ, источник названия — RocksDB для Delay MQ. Принцип его реализации также очень прост, как показано на следующем рисунке.
При доставке задержанного сообщения возьмите в качестве примера тему «cancel_order», логика реализации всего задержанного сообщения следующая.
1. При изменении стороны Producer тема, фактически доставленная в RocketMQ, не является этой, а заменяется унифицированной темой с именем dmq_inner_topic, а исходная тема преобразуется в часть тела.
2. Проект Rock-DMQ будет использовать специальную тему dmq_inner_topic.
3. После потребления сообщения dmq_inner_topic проект Rock-DMQ запишет его в локальную RocksDB, а ключом будет время истечения в качестве префикса (это важнее)
4. Проект Rock-DMQ использует метод реализации, аналогичный содержанию второй части статьи, и периодически опрашивает RocksDB, чтобы узнать, есть ли новости с истекшим сроком действия.
5. Если есть сообщение с истекшим сроком действия, проект Rock-DMQ доставит сообщение в RocketMQ.
6. Первоначальный потребитель, подписавшийся на эту тему, может использовать это сообщение.
Благодаря этой реализации можно поддерживать сообщения о задержке на любое количество секунд, а также лучше повторно использовать существующие технические компоненты без каких-либо изменений в самом RocketMQ и лучше поддерживать горизонтальную масштабируемость.
Основной код был представлен во второй части и не будет повторяться здесь.Если кому-то интересно, вы можете рассмотреть возможность выпуска полного исходного кода позже.
резюме
Вышеизложенное — практика посадки и записи о заполнении ям RocketMQ на нашей стороне. Эти решения все еще находятся в стадии быстрой итеративной оптимизации. Если у вас есть идеи получше, вы можете связаться со мной. Если у вас есть какие-либо вопросы, вы можете отсканировать QR-код ниже, чтобы подписаться на меня Общедоступный номер, чтобы связаться со мной.