Некоторые подводные камни и оптимизации при использовании RocketMQ

Архитектура RocketMQ

RocketMQ широко используется в нашем проекте и столкнулся со многими проблемами в процессе его использования. Например, без изоляции нескольких сред, когда несколько версий разрабатываются и тестируются одновременно, взаимные помехи могут быть серьезными. Доставка в RocketMQ может завершиться неудачно, что приведет к потере сообщений. Кроме того, версия RocketMQ с открытым исходным кодом не поддерживает отложенные сообщения с произвольной точностью времени, а только определенный уровень. В процессе использования мы сделали несколько целевых оптимизаций и разобрали эту статью.

Прочитав эту статью, вы освоите эти знания

  • Попытка решения для изоляции нескольких сред RocketMQ
  • «Надежная» схема доставки сообщений на базе RocksDB
  • Реализация отложенных сообщений с произвольной задержкой на основе RocksDB и RocketMQ

Изоляция нескольких сред RocketMQ

Поскольку у нас есть много функциональных требований, которые будут разрабатываться и тестироваться параллельно, существует целых три или четыре набора сред разработки и тестирования Предположим теперь, что у нас одновременно разрабатываются три версии для одной и той же темы, сообщения, сгенерированные средой разработки dev1, могут быть потреблением среды разработки dev3, код потребительской части двух сред может быть несогласованным, в результате чего невозможно завершить тестирование этой части функции.В этом случае разработчик жалкий и часто должен выйти в автономный режим и отключить потребительский конец других сред, чтобы продолжить Тест разработки, как показано на следующем рисунке.

Чтобы решить эту проблему, я хотел вначале поработать над этой темой.Изменив сторону производителя, тема каждой среды единообразно добавляется с суффиксом среды, так что тема_ABC станет темой_ABC_dev1 в среде dev1. Этот метод также может быть решен в теории, но он требует создания большего количества тем, стоимость относительно высока, а количество изменений велико.

Позднее было принято решение выделить независимую очередь RocketMQ для каждой среды.Для простоты каждой среде выделяется только одна очередь, как показано ниже.

Различение по переменным среды

  • На производственной стороне: среда dev1 доставляется в очередь RocketMQ 0, среда dev2 доставляется в очередь 1 и т. д.
  • Со стороны потребителя: среда dev1 извлекает сообщения только из очереди RocketMQ № 0, среда dev2 извлекает сообщения только из очереди № 1 и т. д.

Реализация производственной стороны

Доставка сообщений RocketMQ предоставляет интерфейс MessageQueueSelector для настройки селектора очереди сообщений и указания очереди, в которую должно быть доставлено сообщение.Его определение выглядит следующим образом.

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

Параметр mqs — это все доступные очереди для текущей темы, а возвращаемое значение — это очередь, которая будет доставлена ​​на этот раз. Он имеет следующие классы реализации:

  • SelectMessageQueueByHash: используйте абсолютное значение хэш-кода параметра msg, чтобы получить модуль размера очереди.
  • SelectMessageQueueByRandom: вызовите метод Random.nextInt для получения случайного числа в диапазоне 0~mqs.size()-1.
  • SelectMessageQueueByMachineRoom: реализация пуста

Для нашего сценария обработка здесь упрощена, а очередь напрямую сопоставляется в соответствии с номером среды.Пример кода на производственной стороне показан ниже.

DefaultMQProducer producer = // ...;

final int envIndex = getEnvIndex();
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(envIndex-1); 
    }
}, envIndex);

Таким образом, среда dev1 сопоставляется с 0-й очередью, а среда dev3 сопоставляется со 2-й очередью.

Потребительская реализация

Для стороны потребителя RocketMQ определяет интерфейс стратегии AllocateMessageQueueStrategy, который может реализовать, какие очереди может использовать текущий потребитель. Ниже показано определение интерфейса AllocateMessageQueueStrategy.

public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup 当前 consumer group
     * @param currentCID 当前 consumer id
     * @param mqAll 当前 topic 的所有 queue 列表
     * @param cidAll 当前 consumer group 下所有的 consumer id set 集合
     * @return 根据策略给当前 consumer 分配的 queue 列表
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 策略算法名
     */
    String getName();
}

RocketMQ предоставляет следующие встроенные алгоритмы распределения

  • AllocateMessageQueueAveragely: Алгоритм среднего распределения
  • AllocateMessageQueueAveragelyByCircle: Распределить по одному в соответствии с кольцом, состоящим из очередей очередей.
  • AllocateMachineRoomNearby: Алгоритм, основанный на принципе близости к компьютерному залу.
  • AllocateMessageQueueByMachineRoom: на основе алгоритма выделения машинного зала.
  • AllocateMessageQueueConsistentHash: на основе согласованного алгоритма хеширования потребитель хешируется в виртуальное кольцо как узел Node.
  • AllocateMessageQueueByConfig: основанный на алгоритме выделения конфигурации, он не имеет никакого эффекта и может быть расширен в качестве примера.

Для нашего сценария здесь упрощена обработка, а очередь напрямую сопоставляется по номеру окружения.Код на стороне потребителя выглядит следующим образом.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQConstant.MQ_CONSUMER_GROUP_NAME, null,
        new AllocateMessageQueueStrategy() {
            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                List<MessageQueue> list = new ArrayList<>();
                list.add(mqAll.get(envIndex-1));
                return list;
            }

            @Override
            public String getName() {
                return "env-based";
            }
        });

Преимущества и недостатки

Реализация этого метода очень проста, количество клиентских изменений очень мало, и нет необходимости модифицировать тему. Если у вас есть фиксированное количество сред, вы можете изменить приведенную выше стратегию, чтобы одна среда могла использовать фиксированное количество очередей, если несколько сред не используют один и тот же интерфейс очередей. Если количество сообщений в среде разработки и тестирования невелико, то не проблема использовать очередь. Производственные онлайн-среды с несколькими компьютерными залами и несколькими средами также могут быть реализованы с использованием аналогичных идей.

На этом введение изоляции нескольких сред завершается.

Потерянные сообщения

RocketMQ сам по себе является сервером. Конечно, будут проблемы, такие как недоступность службы и занятость службы. В частности, все предприятия нашей компании используют RocketMQ. Время от времени будут возникать ненормальные проблемы с доставкой, такие как «система занята, запустить управление потоком на время" и так далее.

Чтобы решить проблему надежности доставки, мы изначально хотели записывать сообщения в постоянное хранилище, такое как базы данных, когда возникали исключения доставки, а затем иметь запланированную задачу для компенсации потребления. Это решение кажется идеальным, но когда RocketMQ недоступен в целом и большое количество сообщений не может быть доставлено, мгновенная нагрузка на запись базы данных будет очень большой, и это решение не было принято.

Позже я подумал об использовании RocksDB, чтобы спасти страну от кривых,

Главный герой

RocksDB – это встроенная система хранения данных типа "ключ-значение", улучшенная Facebook на основе LevelDB, написанной Google Джеффом Дином. В ней реализовано множество оптимизаций, и ее производительность значительно улучшена по сравнению с LevelDB. Хорошо известный базовый механизм хранения TiDB использует RocksDB.

RocksDB — это механизм хранения, основанный на дереве LSM. LSM — это аббревиатура дерева слияния с логарифмической структурой. Что касается основного принципа RocksDB, в этой статье я не буду объяснять его. Я напишу его подробно, когда у меня будет возможность.

Механизм повтора на основе RocksDB

Основная логика заключается в том, что после сбоя доставки сообщение записывается в локальное хранилище RocksDB, а затем существует поток для опроса, есть ли сообщение, и если сообщение есть, оно будет повторено. снова сообщение будет перезаписано в RocksDB Процесс Как показано ниже.

rocksdb retry

С точки зрения реализации ключ, записанный в RocksDB, имеет следующий формат:

expireTime:retryCount:typeName:uuid

Логика генерации expireTime — это текущая метка времени (в секундах) + время задержки доставки Код выглядит следующим образом:

val RETRY_TIME_STEP_ARRAY = arrayOf(
    3, 5, 30, 60, 120, 300, 480, 600, 900, 1800
)

val expire = System.currentTimeMillis() / 1000 + (RETRY_TIME_STEP_ARRAY.getOrNull(retryCount) ?: 10)

Когда доставка сообщения в MQ не удалась, напишите его в RocksDB, эта часть кода выглядит следующим образом.

private fun insert(msg: ByteArray, retryCount: Int, typeName: String) {
    val key = genKey(retryCount, typeName)
    rocksDB.put(mqRetryCFHandler, WRITE_OPTIONS_SYNC, key.toByteArray(), msg)
}

Главный поток отвечает за опрос RocksDB. Если есть запись, она будет извлечена и помещена в очередь блокировки. Основная логика главного потока выглядит следующим образом.

private var lastSeekTime: Long = 0 // 单调递增的值,初始值为当前时间戳(到秒)

private fun loop() {
    val now = // 当前时间戳,到秒
    if (lastSeekTime > now) { // 如果时钟回拨或者还没到处理时间片,睡眠一段时间
        TimeUnit.MILLISECONDS.sleep(400)
        return
    }

    rocksDB.newIterator(mqRetryCFHandler, READ_OPTIONS).use {
        it.seek("$lastSeekTime".toByteArray()) // seek 到以 lastSeekTime 开头的 key 的地方
        while (it.isValid) {
            val value = it.value()
            blockingQueue.put(String(it.key()) to value) // 放入一个固定大小的阻塞队列中
            it.next()
        }
    }
    ++lastSeekTime
}

Рабочий поток отвечает за повторную доставку сообщения, код показан ниже.

private fun startConsume() {
    repeat(THREAD_NUM) {
        thread {
            while (true) {
                val list = drain() // 批量从 blockingQueue 中取数据
                list.forEach {
                    try {
                        val typeName = getTypeName(it.first)
                        val handler = getHandler(typeName) ?: return@forEach
                        val success = handler.handler(it.second)
                        // 如果不成功,则重新写入 RocksDB
                        if (!success) {
                            val currentRetryCount = getRetryCountFromKey(it.first) + 1
                            val maxRetryCount = handler.retryCount
                            if (currentRetryCount >= RETRY_TIME_STEP_ARRAY_SIZE || currentRetryCount >= maxRetryCount) {
                                val msgString = getStringFromBytes(it.second)
                                logger.info("send reach limit, retry count:$currentRetryCount,default count:$RETRY_TIME_STEP_ARRAY_SIZE,custom count:$maxRetryCount, msg: $msgString")
                                exceptionHandle.handler("retry $currentRetryCount fail,msg:$msgString")
                                return@forEach
                            }
                            insert(it.second, currentRetryCount, typeName)
                        }
                    } catch (ex: Throwable) {
                        exceptionHandle.handler("key: $it.first ,error: ${ex.message}")
                        Thread.sleep(30)
                    }
                }
            }
        }
    }
}

Благодаря описанному выше преобразованию мы успешно избежали нескольких кратковременных сбоев RocketMQ за последние полгода.Ни одно сообщение не было потеряно, все повторные попытки были успешными, аномалии в данных не возникали.

Преимущества и недостатки

Преимущество этого решения в том, что оно очень легкое, скорость записи и чтения локальной базы данных RocksDB чрезвычайно высока, а в экстремальных сценариях производительность практически не влияет. Но есть и недостаток, который следует учитывать, потому что он не реализован в централизованном хранилище, таком как MySQL, Если проект развернут в контейнере Docker, после перезапуска контейнера эта часть данных повторных попыток все равно будет потеряна. Используя это решение, невозможно гарантировать 100% потерю данных.Учитывая, что сбои mq случаются нечасто, это также возможная мера для достижения баланса между производительностью и потерей данных.

Дизайн произвольного задержанного сообщения на основе RocksDB

После завершения вышеупомянутого решения «надежной доставки» было получено другое решение, использующее RocksDB для реализации отложенной очереди сообщений с произвольной задержкой, и его проектные цели преследуют три цели:

  • Поддержка произвольной задержки
  • Используйте существующую инфраструктуру
  • Он должен иметь возможность накапливаться бесконечно, а эффективность запросов на запись должна быть высокой.

Итак, на основе RocksDB мы реализовали внутренний проект под названием Rock-DMQ, источник названия — RocksDB для Delay MQ. Принцип его реализации также очень прост, как показано на следующем рисунке.

При доставке задержанного сообщения возьмите в качестве примера тему «cancel_order», логика реализации всего задержанного сообщения следующая.

1. При изменении стороны Producer тема, фактически доставленная в RocketMQ, не является этой, а заменяется унифицированной темой с именем dmq_inner_topic, а исходная тема преобразуется в часть тела.

2. Проект Rock-DMQ будет использовать специальную тему dmq_inner_topic.

3. После потребления сообщения dmq_inner_topic проект Rock-DMQ запишет его в локальную RocksDB, а ключом будет время истечения в качестве префикса (это важнее)

4. Проект Rock-DMQ использует метод реализации, аналогичный содержанию второй части статьи, и периодически опрашивает RocksDB, чтобы узнать, есть ли новости с истекшим сроком действия.

5. Если есть сообщение с истекшим сроком действия, проект Rock-DMQ доставит сообщение в RocketMQ.

6. Первоначальный потребитель, подписавшийся на эту тему, может использовать это сообщение.

Благодаря этой реализации можно поддерживать сообщения о задержке на любое количество секунд, а также лучше повторно использовать существующие технические компоненты без каких-либо изменений в самом RocketMQ и лучше поддерживать горизонтальную масштабируемость.

Основной код был представлен во второй части и не будет повторяться здесь.Если кому-то интересно, вы можете рассмотреть возможность выпуска полного исходного кода позже.

резюме

Вышеизложенное — практика посадки и записи о заполнении ям RocketMQ на нашей стороне. Эти решения все еще находятся в стадии быстрой итеративной оптимизации. Если у вас есть идеи получше, вы можете связаться со мной. Если у вас есть какие-либо вопросы, вы можете отсканировать QR-код ниже, чтобы подписаться на меня Общедоступный номер, чтобы связаться со мной.