Интервьюер: я должен спросить об очередях сообщений

Java

Последовательная пушка очереди сообщений

  1. Как использовать MQ в проекте?
  2. Зачем использовать очереди сообщений?
  3. Каковы преимущества и недостатки очередей сообщений?
  4. kafka, activemq, rabbitmq, RocketMQ что делать?
  5. Как обеспечить высокую доступность очередей сообщений?
  6. Как сделать так, чтобы сообщения не использовались повторно?
  7. Как обеспечить надежную передачу сообщений?
  8. Как обеспечить порядок сообщений?
  9. Написать проект архитектуры очереди сообщений?

Выбор технологии очереди сообщений

решенная проблема:

  1. разъединение
  2. асинхронный
  3. отсечение пика

Сценарий без подключения системы MQ

  1. Система А генерирует относительно критичные данные Многие системы требуют, чтобы система А отправляла данные Сильная связь (системы B, C, D и E могут иметь разные параметры, и данные какое-то время не нужны, а систему A нужно постоянно модифицированное обслуживание кода)
  2. Системе A также необходимо учитывать, не зависли ли системы B, C, D и E и истекло ли время доступа? Хотите попробовать еще раз?

Разделение сценариев с использованием системы MQ

  1. Чтобы поддерживать этот код, вам не нужно учитывать, был ли вызов успешным или нет, а время сбоя истекло.
  2. Если новой системе нужны данные, их можно использовать непосредственно из MQ.Если системе не нужны эти данные, она может отменить потребление сообщений MQ.
    Описание: Благодаря модели сообщений публикации-подписки MQ (Pub/Sub) система A полностью отделена от других систем.

Сценарии запросов с высокой задержкой без синхронизации MQ

Как правило, интернет-предприятия требуют, чтобы каждый запрос был в пределах 200 мс для непосредственных пользовательских операций, что почти незаметно для пользователей.

Оптимизация производительности интерфейса после использования MQ для асинхронности

Улучшить интерфейс с высокой задержкой

Сцена, где система умирает в пиковый период, когда MQ не используется

В пиковый период происходит 5000 запросов в секунду и 5000 SQL-запросов в секунду выполняется к MySQL (вообще у MySQL почти 2000 запросов в секунду) Если MySQL будет убит, вся система выйдет из строя, и пользователи не смогут использовать систему. Но после пикового периода может быть 50 запросов в секунду без какой-либо нагрузки на всю систему.

Сценарии отсечения пиков с использованием MQ

В MQ записывается 5000 запросов, система A может обрабатывать только до 2000 запросов в секунду (MySQL обрабатывает до 2000 запросов в секунду), система A медленно вытягивает запросы из MQ, вытягивая 2000 запросов в секунду. MQ, каждую секунду приходит 5000 запросов, а уходит только 2000. В результате в пиковый период (21 час) в MQ может быть отставание от сотен тысяч и даже миллионов запросов. Это нормально, т.к. После пикового периода будет 50 запросов в секунду, но система А будет по-прежнему обрабатывать 2000 запросов в секунду. Пока пиковый период позади, Система А быстро устраняет накопившиеся сообщения. (Рассчитывая учетную запись, в MQ задерживается 3000 сообщений в секунду, сообщения 18W будут отправлены в журнал за одну минуту, а 10 миллионов сообщений будут отправлены в журнал за один час. По окончании пикового периода для завершения потребуется около часа. .Избавьтесь от 1000Вт бэклога сообщений)

Проблемы после внедрения MQ в архитектуру

  1. Снижение доступности системы
    MQ может зависнуть, что приведет к сбою всей системы.
  2. Сложность системы увеличивается
    Могут быть отправлены повторяющиеся сообщения, что приведет к вставке повторяющихся данных, потерям сообщений, нарушению порядка сообщений, зависанию систем B, C и D, что приведет к накоплению сообщений MQ и заполнению диска;
  3. Проблемы согласованности
    Первоначально A,B,C,D должны быть выполнены успешно, а затем вернуться, результат A,B,C выполнен успешно, а D не выполнен

Каковы преимущества и недостатки Kafka, ActiveMQ, RabbitMQ, RocketMQ

характеристика ActiveMQ RabbitMQ RocketMQ Kafka
Производительность одной машины Уровень десять тысяч (около 1 Вт ~ 2 Вт запроса в секунду) 10000 класс сто тысяч сто тысяч
Своевременность мс уровень Микросекундный уровень, это основная особенность rabbitmq, самая низкая задержка мс уровень в течение мс
Доступность Высокая и высокая доступность на основе архитектуры ведущий-ведомый Высокая и высокая доступность на основе архитектуры ведущий-ведомый Очень высокая, распределенная архитектура Очень высокий, Kafka распределен, одни данные имеют несколько копий, несколько машин не работают, данные не будут потеряны, и это не приведет к недоступности
надежность сообщения Низкая вероятность потери данных После настройки оптимизации параметров можно достичь нулевой потери После настройки оптимизации параметров сообщение может быть потеряно.
Резюме преимуществ и недостатков Преимущества: очень зрелый, мощный и используется в большом количестве компаний и проектов в отрасли. Недостатки: Изредка низкая вероятность потери сообщений, а сейчас коммьюнити и отечественных приложений все меньше и меньше, а официальное сообщество все меньше и меньше поддерживает ActiveMQ 5.x, и он действительно в основном основан на развязке и асинхронном использовании, меньше in Используется в крупномасштабных сценариях пропускной способности Преимущества: разработка на языке erlang, очень хорошая производительность, низкая задержка, отличный интерфейс управления, активное сообщество. Недостатки: RabbitMQ имеет более низкую пропускную способность (десятки тысяч отдельных машин), это связано с тем, что его механизм реализации относительно тяжелый. И разработка erlang, сколько внутренних сильных сторон у исследования и настройки исходного кода erlang? Отсутствие контроля, полагаясь на сообщество открытого исходного кода для поддержки и исправления ошибок. Более того, динамическое расширение кластера RabbitMQ будет очень хлопотным, на самом деле это в основном вызвано самим языком erlang, трудно читать исходный код, сложно настраивать и контролировать Преимущества: интерфейс прост и удобен в использовании, гарантируется Али, ежедневная обработка сообщений составляет десятки миллиардов, он может достигать крупномасштабной пропускной способности, производительность также очень хорошая, распределенное расширение также очень удобно, сообщество обслуживание в порядке, надежность и доступность в порядке, Он также может поддерживать большое количество тем и сложных бизнес-сценариев MQ. Исходный код - java, что удобно для компаний для настройки и контроля. Недостатки: сообщество в целом активно, интерфейс не соответствует стандартной спецификации JMS, а некоторая миграция системы требует значительной модификации кода технологии, возможно, что от этой технологии откажутся. Преимущества: Предоставляет меньше основных функций, но улучшает сверхвысокую пропускную способность, задержку на уровне мс, чрезвычайно высокую доступность и надежность и может распределяться произвольно.Kafka лучше всего поддерживает небольшое количество тем, чтобы обеспечить чрезвычайно высокую пропускную способность. Недостатки: может быть повторное потребление сообщений, что повлияет на точность данных.В области больших данных и сбора журналов это влияние можно игнорировать.Это, естественно, подходит для расчета больших данных и сбора журналов в реальном времени.

Рекомендация: Малые и средние компании RabbitMQ Крупные компании: RocketMQ Вычисления больших данных в реальном времени: Kafka

высокая доступность очереди сообщений

Высокая доступность RabbitMQ

RabbitMQ имеет три режима: автономный режим, обычный режим кластера, режим зеркального кластера.

  • Автономный режим
    демонстрационный уровень
  • Обычный кластерный режим (без высокой доступности)
    Метаданные очереди существуют в нескольких экземплярах, но сообщение не существует в нескольких экземплярах каждый раз

    Запустите несколько экземпляров rabbitmq на нескольких машинах, по одному на машину.
    Преимущества: несколько компьютеров могут получать сообщения, что может повысить пропускную способность потребления.
    Недостатки: внутри rabbitmq может генерироваться большое количество передаваемых данных, доступность в принципе не гарантируется, а машина, на которой находится очередь, не работает, и нет возможности ее потреблять.
    нет высокой доступности
  • Режим зеркального кластера (высокая доступность, нераспределенный)

    Метаданные и сообщения очереди будут существовать в нескольких экземплярах.Каждый раз, когда сообщение записывается в очередь, оно автоматически отправляется в очереди нескольких экземпляров для синхронизации сообщений. То есть у каждого узла есть полный образ этой очереди (все данные этой очереди). Если какой-либо узел не работает, другие узлы также содержат полные данные очереди, и другие потребители могут перейти к другим рабочим узлам для потребления данных, и все это в порядке. Недостатки: Не распределяется.Если объем данных этой очереди большой, то емкость этой машины не может быть размещена.
    Как включить режим зеркального кластера: в консоли управления на странице администратора добавьте новую политику для режима зеркального кластера.Если указано, вы можете запросить синхронизацию данных со всеми узлами или запросить синхронизацию с указанным числом узлов, а затем вы снова создаете очередь.При применении этой стратегии данные будут автоматически синхронизированы с другими узлами.
  • Архитектура высокой доступности Kafka

    Процесс брокера — это отдельный процесс, который Kafka запускает на каждой машине. Каждая машина + процесс брокера на машине может рассматриваться как узел в кластере kafka.
    Вы создаете тему, эту тему можно разделить на несколько разделов, каждый раздел может существовать на разных брокерах, и каждый раздел хранит часть данных.
    Это естественная распределенная очередь сообщений, то есть данные топика разбросаны по нескольким машинам, и каждая машина помещает часть данных.
    Реальный смысл распределенного заключается в том, что каждый узел помещает только часть данных, а не полные данные (полные данные — это HA, кластерный механизм).
    До версии Kafka 0.8 не было механизма HA, если какой-то брокер выходит из строя, некоторые данные отсутствуют.
    После Kafka 0.8 предоставляется механизм HA, который является механизмом реплик. Данные каждого раздела будут синхронизированы с другими машинами, чтобы сформировать собственные множественные реплики. Затем все реплики выберут лидера. Затем с этим лидером будут иметь дело производители и потребители, а затем последуют другие реплики. При записи лидер отвечает за синхронизацию данных со всеми фолловерами, а при чтении может напрямую считывать данные на лидере. Если брокер не работает, и он оказывается лидером раздела, то в это время будет избран новый лидер, и все смогут продолжать читать и писать этому новому лидеру.Это так называемая высокая доступность.
    Механизм синхронизации лидера и ведомого:
    При записи данных производитель записывает лидера, затем лидер записывает данные на локальный диск, а затем другие последователи активно извлекают данные из лидера. Как только все фолловеры синхронизируют свои данные, они отправят подтверждение лидеру.После того, как лидер получит подтверждения от всех фолловеров, он вернет производителю сообщение об успешной записи.
    При потреблении оно будет прочитано только лидером, но только когда сообщение было синхронно и успешно возвращено всеми подписчиками, сообщение будет прочитано потребителем.

дублирующиеся данные очереди сообщений

MQ может гарантировать только то, что сообщение не будет потеряно, и не может гарантировать повторную передачу.

Возможные проблемы дублирования потребления на стороне потребителя Kafka

Каждое сообщение имеет смещение, представляющее порядковый номер сообщения. В соответствии с порядком, в котором данные поступают в Kafka, Kafka присваивает смещение каждой части данных, которое представляет порядковый номер данных. Когда потребители потребляют из Kafka, Чтобы потреблять в этом порядке, потребитель отправит смещение, которое должно сообщить kafka, что данные смещения = 153 были использованы; zk записывает сообщение о том, что потребитель в настоящее время потребляет смещение = несколько; система перезапускается. после перезапуска потребители найдут kafka и позволят kafka продолжать передавать мне данные за место, которое я потреблял в прошлый раз. Причина повторяющихся сообщений: (в основном возникает после перезапуска потребителя) Потребитель не отправляет смещение сразу после потребления части данных, а регулярно и регулярно отправляет смещение. Если потребитель готов отправить смещение, но процесс потребителя перезапускается, когда смещение не было отправлено, смещение сообщения, которое было использовано в это время, не отправляется, и kafka не знает, что вы использовали смещение. = 153. данные, в это время kafka отправит вам данные смещения = 152 153 154, в это время сообщение смещения = 152 153 используется повторно

Гарантия идемпотентности повторного потребления MQ

Идемпотентность: данные или запрос повторяются для вас много раз, и вы должны убедиться, что соответствующие данные не изменятся и не смогут ошибиться. Идеи:

  1. Чтобы получить данные для записи в базу, сначала проверьте первичный ключ, если данные есть, не вставляйте их и выполните обновление
  2. Если вы пишете redis, то проблем нет, все равно он устанавливается каждый раз, естественная идемпотентность
  3. Производитель отправляет сообщение с глобальным уникальным идентификатором. После того, как потребитель получит сообщение, сначала зарегистрируйтесь в Redis в соответствии с этим идентификатором. Если оно не было потреблено ранее, оно будет обработано и записано в Redis. , если оно было потреблено , он не будет обработан.
  4. Уникальный ключ на основе базы данных

Убедитесь, что сообщения MQ не потеряны

MQ передает самые основные сообщения, такие как: реклама биллинговой системы, пользователь нажимает на рекламу, и доллар вычитается.Если сообщение потеряно во время вычета, деньги будут продолжать уменьшаться, а накопление будет складываться, что большое дело для компании.

Возможные проблемы потери данных с RabbitMQ

  1. Когда производитель пишет сообщение, сообщение не доходит до rabbitmq и теряется при передаче по сети. Или сообщение пришло в rabbitmq, но произошла внутренняя ошибка и оно не сохранилось.
  2. После того, как RabbitMQ получает сообщение, оно временно сохраняется в памяти хоста, в результате потребитель не успевает потреблять, и RabbitMQ зависает сам по себе, что приводит к потере данных, временно хранящихся в памяти.
  3. Потребитель потребляет это потребление, но прежде чем оно может быть обработано, он вешает трубку. RabbitMQ считает, что потребитель закончил обработку.

Решение проблемы 1: механизм транзакций: (обычно не используется, синхронный, производитель будет синхронно блокироваться и ждать, пока вы добьетесь успеха или потерпите неудачу при отправке сообщения. Это приведет к падению пропускной способности сообщения, отправленного производителем)

    channel.txSelect
try {
    //发送消息
} catch(Exception e){
    channel.txRollback;
    //再次重试发送这条消息
} 
    channel.txCommit;

Механизм подтверждения: (этот механизм обычно используется, асинхронный режим не блокируется, и пропускная способность будет относительно высокой)

  1. Сначала установите канал для подтверждения режима
  2. отправить сообщение в rabbitmq
  3. Не беспокойтесь об этом после отправки сообщения
  4. Если rabbitmq получит это сообщение, он вызовет локальный интерфейс вашего производителя, чтобы уведомить вас о том, что я получил это сообщение.
  5. Если rabbitmq сообщит об ошибке при получении сообщения, он перезвонит вашему интерфейсу, чтобы сообщить вам, что получение сообщения не удалось, и вы можете повторно отправить его.
public void ack(String messageId){

}

public void nack(String messageId){
    //再次重发一次这个消息
}

Решение проблемы 2. Сохранение на диск

  1. При создании очереди сделайте ее постоянной, чтобы rabbitmq мог гарантировать, что метаданные очереди будут постоянными, но данные в очереди не будут постоянными.
  2. При отправке сообщения установите для deliveryMode значение 2 и сделайте сообщение постоянным, тогда rabbitmq сохранит сообщение на диск. 2 постоянства должны быть установлены одновременно.
  3. Сохранение может быть объединено с механизмом подтверждения на стороне производителя.Только после того, как сообщение будет сохранено на диске, производитель будет уведомлен о подтверждении, поэтому даже до того, как оно будет сохранено на диске, rabbitmq зависает, данные теряются, и production Если пользователь не получил акк, вы также можете повторно отправить его самостоятельно.
    Недостатки: Может быть небольшая вероятность потери данных. Сообщение только что записывается в rabbitmq, но еще не сохранено на диске. К сожалению, rabbitmq зависает, что приведет к потере небольшого количества данных в памяти.

Проблема 3 Решение: Причина: Потребитель включил механизм autoAck (сообщение потребляется, оно все еще обрабатывается и еще не обработано. В это время потребитель автоматически выполняет autoAck, уведомляя rabbitmq о том, что сообщение было потребляется, а не в это время.Совпадение, система-потребитель не работает, сообщение потеряно и еще не обработано, а rabbitmq думает, что сообщение было обработано) Решение: отключить autoAck, и после обработки сообщения само по себе, отправьте подтверждение на rabbitmq. Если оно выйдет из строя до того, как оно будет обработано в это время, rabbitmq не получит сообщение подтверждения, которое вы отправили в это время, и тогда rabbitmq перераспределит сообщение другим потребителям для обработки.

Возможные проблемы с потерей данных в Kafka

  1. Потребитель теряет данные
    Причина: после того, как потребитель потребляет сообщение, автоматически отправляется смещение. Kafka думает, что вы уже использовали сообщение. В результате потребитель вешает трубку, и сообщение теряется.
    Пример: после того, как потребитель потребляет данные, он записывает их в очередь в памяти для кэширования, сообщение автоматически отправляется в смещение, и система перезапускается.В результате данные в очереди в памяти, которые еще не обработанные, будут потеряны.
    Решение: Kafka автоматически отправит смещение, поэтому, если автоматическая отправка смещения отключена и вы вручную отправляете смещение после обработки, вы можете гарантировать, что данные не будут потеряны. Но в это время еще будет повторное потребление.Например, сразу после завершения обработки зачет не был отправлен, а результат завис.В это время он обязательно повторится один раз, и это идемпотент.
  2. Кафка сбрасывает сообщения
    Причина: При падении брокера в кафке, а затем переизбрании лидера партиции, у других фолловеров просто есть какие-то данные, которые в это время не синхронизированы, в результате лидер в это время зависает, а после избрания ведомый в качестве ведущего, предыдущий ведомый теряется Несинхронизированные данные в ведущем.
    Пример: Лидерная машина Кафки не работает, после переключения фолловера на лидера обнаруживается, что данные потеряны
    Решение: (гарантируйте, что данные не будут потеряны при сбое лидера на стороне брокера kafka или при переключении лидера)
  3. Установите replication.factor для темы, это значение должно быть больше 1, чтобы гарантировать, что каждый раздел должен иметь как минимум 2 реплики.
  4. Установите параметр min.insync.replicas на сервере kafka.Это значение должно быть больше 1. Это необходимо для того, чтобы лидер, по крайней мере, осознавал, что есть хотя бы один ведомый, который все еще находится в контакте с самим собой и не остается позади, чтобы убедиться, что лидер завис и есть еще один ведомый.Убедитесь, что хотя бы один ведомый может поддерживать нормальную синхронизацию данных с лидером.
  5. Установите acks = all на стороне производителя, что требует, чтобы каждый фрагмент данных был записан во все реплики, прежде чем его можно будет считать успешным. В противном случае производитель будет все время повторять попытки.В это время установите retries = MAX (большое значение повтора), требуя, чтобы после сбоя записи он зависал здесь (во избежание потери сообщения).
  6. производитель кафки теряет сообщение
    По схеме 2 ставим ack=all, не потеряется. Это потребует, чтобы лидер получил сообщение, и только после того, как все последователи синхронизируются с сообщением, запись будет считаться успешной. Если это условие не выполняется, производитель будет бесконечно повторять попытки.

порядок очереди сообщений

Справочная информация: в системе синхронизации бинарных журналов mysql добавление, удаление и изменение части данных в mysql соответствует добавлению, удалению и изменению 3 бинарных журналов, затем эти 3 бинарных журнала отправляются в MQ, и они выполняются последовательно, когда они По крайней мере, порядок должен быть гарантирован, иначе последовательность становится удалить, изменить, добавить. Ежедневные данные синхронизации достигают сотен миллионов, mysql->mysql, таких как команда больших данных, должна синхронизировать базу данных mysql для выполнения различных сложных операций с данными бизнес-системы компании. Сцены:

  1. rabbitmq, одна очередь, несколько потребителей, это не очевидное нарушение
  2. кафка, топик, партиция, потребитель, внутренняя многопоточность, это не лажа

Сообщения RabbitMQ не по порядку

Как RabbitMQ гарантирует порядок сообщений

Данные, необходимые для гарантии заказа, помещаются в ту же очередь

Сообщения Кафки не по порядку

Данные, записываемые в раздел, должны быть упорядочены. Производитель может указать ключ при записи, например, идентификатор заказа в качестве ключа, тогда данные, связанные с заказом, будут распределены по разделу, и данные в этом разделе должны быть в порядке. Раздел в Kafka может использоваться только одним потребителем. Когда потребители извлекают данные из раздела, они должны быть в порядке.

Kafka гарантирует порядок сообщений

Если потребитель потребляет и обрабатывает в одном потоке, если обработка занимает много времени, обработка сообщения занимает десятки миллисекунд, а за одну секунду можно обработать только десятки фрагментов данных — такая пропускная способность слишком мала. Для параллельной обработки необходимо использовать многопоточность.Потребитель в стресс-тесте — это 4-ядерная одиночная машина 8G, 32 потока и может обрабатывать не более тысячи сообщений в секунду.

Задержка и истечение очереди сообщений

Есть проблема на стороне потребителя, нет потребления или очень медленное потребление. Тут есть подводный камень: диск вашего кластера очереди сообщений почти заполнен, и его никто не потребляет, что делать? После задержки в несколько часов rabbitmq устанавливает время истечения срока действия сообщения и исчезает, что мне делать? Например: после каждого потребления приходится писать mysql, но mysql зависает, а зависание потребителя не движется. То, от чего потребитель зависит локально, зависает, вызывая зависание потребителя. Потребление не обрабатывалось в течение длительного времени, что привело к переполнению mq. Сценарий: десятки миллионов фрагментов данных хранятся в MQ в течение семи или восьми часов.

Быстрая обработка накопившихся сообщений

Один потребитель — 1000 в секунду, 3 потребителя в секунду — 3000, одна минута — 18 Вт, а для восстановления более 1000 Вт потребуется час.

шаг:

  1. Сначала устраните проблему потребителя, чтобы убедиться, что он возобновил скорость потребления, а затем остановите всех существующих потребителей.
  2. Создайте новую тему, раздел в 10 раз больше исходного, и временно установите исходный в 10 раз или в 20 раз больше очередей
  3. Затем напишите временную программу-потребитель, которая распределяет данные.Эта программа развертывается для использования невыполненных данных.После потребления она не выполняет трудоемкую обработку, а напрямую опрашивает и записывает в 10 раз больше очередей, которые были временно установлены.
  4. Затем временно реквизируйте 10-кратное количество машин для развертывания потребителей, и каждая партия потребителей потребляет временную очередь данных.
  5. Этот подход эквивалентен временному расширению ресурсов очереди и потребительских ресурсов в 10 раз при нормальной скорости в 10 раз.
  6. После быстрого использования невыполненных данных восстановите исходную архитектуру развертывания и повторно используйте исходный компьютер-потребитель для обработки сообщений.
    Раньше для завершения 3 потребителей требовался 1 час, но теперь 30 временных потребителей могут быть завершены за 10 минут.

Если используется rabbitmq и установлено время истечения, если отставание потребления в очереди превышает определенное время, оно будет очищено rabbitmq, и данные будут потеряны напрямую. В это время начните писать программу, узнайте потерянную партию данных, а затем заново залейте ее в mq, чтобы восполнить потерянные в течение дня данные.

Если в mq есть отставание сообщений и оно давно не обрабатывалось, то mq почти заполнен, вы временно пишете программу, получаете доступ к данным для потребления, записываете их во временный mq, а потом пускаете других потребителей медленно потреблять или потреблять один.Выбросьте один, не используйте его, быстро потребляйте все сообщения, а затем добавляйте данные ночью.

Как спроектировать промежуточную архитектуру очереди сообщений

  1. MQ должен поддерживать масштабируемость и быстрое расширение. Спроектируйте распределенный MQ, брокер-> тема-> раздел, поместите машину в каждый раздел и сохраните часть данных. Если ресурсов сейчас не хватает, добавьте в тему раздел, затем сделайте миграцию данных и добавьте машины.
  2. Данные mq помещаются на диск, чтобы предотвратить зависание процесса и потерю данных.Записывайте последовательно, чтобы не было накладных расходов на адресацию при случайном чтении и записи на диске, а производительность последовательного чтения и записи на диск очень высоко Это идея кафки.
  3. mq высокая доступность. Несколько копий->лидер и последователь->брокер вешает трубку, чтобы переизбрать лидера для предоставления услуг внешнему миру
  4. Данные поддержки 0 потеряны.

Эта статья опубликована в блогеOpenWriteвыпуск!