предисловие
Студенты, которые использовали MQ, могли столкнуться ссообщения накапливаютсяПроблема. И Фейхао недавно тоже ступил на эту яму, но обнаружил, что результат вызван такой неожиданной причиной.
текст
Той ночью луна была темной, а ветер был сильным, и Фейхао уже собирался отправиться домой, как вдруг получил предупреждающее текстовое сообщение о бомбардировке!«Предупреждение о накоплении сообщений MQ [TOPIC: XXX]»
В сердце Фейхао «Тысячи травяных и грязных лошадей рухнули~». Первой реакцией было: «Что за ерунда?
Поэтому я помчался обратно в компанию, быстро включил компьютер и сел на фон RocketMQ для просмотра (собственная версия RocketMQ с открытым исходным кодом компании).
Держите траву (キ`゚Д゚´)!!!Накопилось более 300 миллионов сообщений? ? ?
Чтобы знать, что есть накопление сообщений и не заботиться об этой проблеме:
Скорость производства производителя >> Скорость обработки потребителя
- Внезапное увеличение скорости производства производителя, например, внезапное увеличение потока производителя.
- Скорость потребления становится медленнее, например, ввод-вывод экземпляра-потребителя сильно заблокирован или отключен.
Вытрите холодный пот с моей головы 😓... Поторопитесь и взгляните на потребительский сервер.
Приложение работает нормально! Серверный диск IO в норме! Сеть нормальная!
Тогда идите на сервер производителя, эй... Трафик тоже нормальный!
Какие? ? ? Будда 😨 ... Заявки производителей и потребителей нормальные, но почему сообщений так много скапливается? Наблюдение за тем, как накапливаются кучи (было бы неплохо, если бы это было количество моих волос), заставило меня еще больше забеспокоиться.
Хотя версия RocketMQ может поддерживать накопление 1 миллиарда сообщений, производительность не будет значительно ухудшаться из-за накопления сообщений, 😰 Но это накопление, очевидно, является нештатной ситуацией.
В RocketMQ есть ошибки, да, это должно быть горшок RocketMQ!
Эта статья закончилась...
Ха-ха, давайте приступим к делу, хоть Толстяк Хао и не может драться с отцом, но, по крайней мере, не может его обмануть 😂
Зайдите на проект потребителя и проверьте логи, эммм... ошибок не обнаружено, логов ошибок нет... вроде все нормально.
Ха... Но разве это потребление не слишком медленное? ? ? Это ненаучно.Потребители настроены с кластером потребления из узлов 3. В соответствии с потребностями бизнеса мощность потребления используется. Открою еще раз потребительскую информацию этой ТЕМЫ
Эй, как ClientId этих трех потребителей может быть одинаковым?
Имея многолетний опыт майнинга, он прямо сказал мне: «Это из-за той же проблемы ClientId брокер путается при раздаче сообщений, так что сообщения не могут нормально проталкиваться потребителям?» Потому что и производитель, и потребитель ведут себя обычно, поэтому я думаю, что проблема может быть в брокере.
Исходя из этого предположения, нам необходимо решить следующие задачи:
- Два потребителя развернуты на разных серверах, почему ClientId одинаковый?
- Будет ли один и тот же ClientId вызывать ошибки рассылки сообщений брокера?
анализ проблемы
Почему ClientId одинаковый? Я предполагаю, что это из-за проблемы с контейнером Docker. Потому что компания недавно приступила к этапу контейнеризации, и как раз потребительские проекты также находятся в списке первой партии этапов контейнеризации.
Любой, кто знаком с Docker, знает, что когда процесс Docker запускается, он создает файл с именемdocker0
виртуальный мост. Контейнеры Docker на хосте будут подключаться к этому виртуальному мосту. Виртуальный мост работает как физический коммутатор, поэтому все контейнеры на узле подключаются к сети уровня 2 через коммутатор. Обычно в Docker есть четыре сетевых режима:
- Режим хоста
- Шаблон контейнера
- Нет режима
- Режим моста
Студенты, которые не разбираются в этих режимах, находят себе маму🤔
Все наши контейнеры используют режим Host, поэтому сеть контейнера точно такая же, как у хоста.
Как видите, первое здесьdocker0
сетевая карта,IP по умолчанию 172.17.0.1. Так что очевидно, что ClientId должен читать всеdocker0
IP-адрес сетевой карты, это проблема, которая может объяснить, почему ClientIds нескольких потребителей одинаковы.
Затем следующий шаг: где установлен clientId? Я изобретательно искал ключевое слово «Docker» в проблемах Github, искал и, конечно же,! Есть еще много единомышленников, которые ступили на вторую яму.После скрининга нашли более надежную.open issue
Видно, что этот брат находится в той же ситуации, что и моя, и его вывод примерно совпадает с моим предположением выше (в это время он очень горд собой), он также упомянул здесь, что clientId находится в ClientConfig в классеbuildMQClientId
метод определен.
исследование исходного кода
Войдите в класс ClientConfig и найдите метод buildMQClientId.
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
Я считаю, что благодаря этому каждый может увидеть правило генерации clientId, которое消费者客户端的IP + "@"+ 实例名称
, очевидно, проблема заключается в получении IP-адреса клиента.
Давайте продолжим смотреть, как он получает IP-адрес клиента.
public class ClientConfig {
...
private String clientIP = RemotingUtil.getLocalAddress();
...
}
public static String getLocalAddress() {
try {
// Traversal Network interface to get the first non-loopback and non-private address
Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
ArrayList<String> ipv4Result = new ArrayList<String>();
ArrayList<String> ipv6Result = new ArrayList<String>();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
if (!address.isLoopbackAddress()) {
if (address instanceof Inet6Address) {
ipv6Result.add(normalizeHostAddress(address));
} else {
ipv4Result.add(normalizeHostAddress(address));
}
}
}
}
// prefer ipv4
if (!ipv4Result.isEmpty()) {
for (String ip : ipv4Result) {
if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
continue;
}
return ip;
}
return ipv4Result.get(ipv4Result.size() - 1);
} else if (!ipv6Result.isEmpty()) {
return ipv6Result.get(0);
}
//If failed to find,fall back to localhost
final InetAddress localHost = InetAddress.getLocalHost();
return normalizeHostAddress(localHost);
} catch (Exception e) {
log.error("Failed to obtain local address", e);
}
return null;
}
Если у вас есть друг, который пытался получить IP-адрес текущей машины, вам следуетRemotingUtil.getLocalAddress()
Этот инструмент не является незнакомым ~
Проще говоря, это получение IP-адреса сетевой карты текущей машины, но поскольку сетевой режим контейнера принимает режим хоста, это означает, что каждый контейнер и хост находятся в одной сети, поэтому мы также можем видеть Docker. -Сервер в контейнере.созданdocker 0
сетевая карта, так он читаетdocker 0
IP-адрес сетевой карты по умолчанию — 172.17.0.1.
(Я общался с одноклассниками по эксплуатации и обслуживанию. В настоящее время, поскольку это первый этап контейнеризации, он сначала развертывается в простом режиме, а позже будет постепенно заменен на k8s. Каждый pod имеет свой собственный независимый IP, и сеть будет подключена к приемнику.Изоляция хоста и других подов.Эммм....k8s!Звучит офигенно,и я недавно читал книги по этому поводу)
** В это время вы можете быть умным и спросить: «Разве нет другого параметра для имени экземпляра, как это может быть таким же?» ** Не волнуйтесь, мы продолжим читать 👇
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
getInstanceName()
Метод фактически получает непосредственноinstanceName
Это значение параметра, но когда присваивается значение этого параметра? Да, черезchangeInstanceNameToPID()
Если этот метод назначен, этот метод будет вызываться при запуске потребителя.
Логика этого параметра очень проста, при инициализации сначала будет получена переменная окружения.rocketmq.client.name
Есть ли значение, если нет, используйте значение по умолчаниюDEFAULT
.
Затем, когда потребитель запустится, он будет судить, соответствует ли значение этого параметраDEFAULT
, если что звонитеUtilAll.getPid()
.
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
} catch (Exception e) {
return -1;
}
}
По имени метода мы можем четко знать, что этот метод фактически получает номер процесса. Тогда... почему полученные идентификаторы процессов совпадают?
Умница, возможно, ты уже знаешь ответ, верно 🤨! Тут надо упомянутьТри особенности Docker
- cgroup
- namespace
- unionFS
Правильно, здесь используется технология пространства имен.
Linux Namespace — это функция, предоставляемая ядром Linux, которая может реализовать изоляцию системных ресурсов, таких как: PID, User ID, Network и т. д.
Поскольку все они используют один и тот же базовый образ и запускают один и тот же проект JAVA на самом внешнем уровне, мы можем зайти в контейнер и увидеть, что их номера процессов равны 9.
После ряда остроумных рассуждений и аргументов Фейхао,В сетевом режиме HOST контейнера Docker генерируется тот же clientId!
На данный момент мы решили первую проблему, упомянутую выше!
Следуя темпу Конана Фейхао, мы продолжаем рассуждать о втором вопросе:Один и тот же clientId заставляет брокера распространять сообщение об ошибке?
Потребитель должен использовать clientId в качестве уникального идентификатора клиента-потребителя во время балансировки нагрузки.При доставке сообщения из-за непротиворечивости clientId возникают ошибки распределения нагрузки.
Итак, давайте рассмотрим, как реализована балансировка нагрузки потребителя. Сначала я думал, что балансировкой нагрузки на стороне потребителя занимается Брокер, и Брокер назначает разные Очереди разным Потребителям в соответствии с зарегистрированными Потребителями. Но, просмотрев документ с описанием исходного кода и проведя некоторое исследование исходного кода, я обнаружил, что у меня все еще слишком мало знаний (Хахаха, должны быть некоторые друзья, у которых есть та же идея, что и у меня).
добавимОбщая архитектура RocketMQ
Из-за нехватки места здесь я объясняю только отношения между брокером и потребителем.Если вы не понимаете других ролей, вы можете прочитать статью, которую я написал о введении RocketMQ.
- Потребитель и один из узлов в кластере NameServer (выбирается случайным образом)установить длительное соединение, периодически получать информацию о маршрутизации темы от NameServer.
- Установить длительное соединение с Брокером в соответствии с полученной информацией о маршрутизации Темы, иРегулярно отправляйте пульсацию брокеру.
Когда брокер получает сообщение пульса, он сохраняет информацию о потребителе в переменной локального кеша.consumerTable. Приведенный выше рисунок примерно объясняет структуру хранилища и содержимое ConsumerTable, самое главное, что он кэширует clientId каждого потребителя.
Что касается режима потребления Consumer, то я прямо цитирую объяснение исходного кода
В RocketMQ два режима потребления (Push/Pull) на стороне потребителя основаны на режиме извлечения для получения сообщений, в то время как режим Push является просто инкапсуляцией режима Pull.Суть его в том, что поток получения сообщений извлекает из После извлечения пакета сообщений и отправки их в пул потоков потребления сообщений они продолжают попытки снова получить сообщения с сервера «без остановок». Если сообщение не извлекается, продолжайте извлекать после задержки.
В двух методах потребления, основанных на режиме извлечения (Push/Pull), стороне потребителя необходимо знать, из какой очереди сообщений на стороне брокера следует получать сообщения. Следовательно, необходимо выполнять балансировку нагрузки на стороне Потребителя, то есть, какие Потребители в одной и той же ConsumerGroup распределяются по нескольким MessageQueue на стороне Брокера.
Проще говоря, будь то режим Push или Pull, управление потреблением сообщений находится на Потребителе, поэтому реализация балансировки нагрузки Потребителя находится на стороне Клиента Потребителя..
Глядя на исходный код, можно обнаружить, что RebalanceService завершает поток службы балансировки нагрузки (выполняется каждые 20 секунд), а метод run() потока RebalanceService, наконец, вызывает класс RebalanceImpl.rebalanceByTopic()
метод, который является ядром реализации балансировки нагрузки на стороне потребителя. здесь,rebalanceByTopic()
Метод будет выполнять различную логическую обработку в зависимости от того, является ли тип связи потребителя «широковещательным режимом» или «кластерным режимом». Здесь мы в основном рассматриваем основной поток обработки в кластерном режиме:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
..... // 省略
}
case CLUSTERING: {
// 获取该Topic主题下的消息消费队列集合
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 向 broker 获取消费者的clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
// 默认平均分配算法
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
(1) Из переменной локального кешаtopicSubscribeInfoTable
, получить набор очереди потребления сообщений (mqSet) в разделе темы;
(2) Вызывается в соответствии с темой и ConsumerGroup в качестве параметровfindConsumerIdList()
Метод отправляет брокеру для получения содержимого группы потребителей.список идентификаторов клиентов;
(3) Сначала отсортируйте очередь потребления сообщений и идентификатор потребителя в теме, а затем используйтеАлгоритм стратегии распределения очереди сообщений(По умолчанию: средний алгоритм распределения очередей сообщений) и рассчитайте очереди сообщений, которые необходимо извлечь. Алгоритм среднего распределения здесь аналогичен алгоритму пейджинга: отсортируйте все очереди сообщений в том же порядке, что и записи, отсортируйте всех потребителей-потребителей в том же порядке, что и количество страниц, и найдите средний размер, который должен содержать каждая страница и каждая страница. record.range и, наконец, пройтись по всему диапазону, чтобы вычислить записи, которые должны быть выделены текущему потребителю (здесь: MessageQueue).
(4) Затем вызовите метод updateProcessQueueTableInRebalance().Специфический метод заключается в том, чтобы сначала отфильтровать и сравнить назначенный набор очередей сообщений (mqSet) с processQueueTable.
- Красная часть, отмеченная processQueueTable на приведенном выше рисунке, указывает на то, что она не содержит друг друга с назначенным набором очередей сообщений mqSet. Установите для свойства Dropped этих очередей значение true, а затем проверьте, можно ли удалить эти очереди из переменной кэша processQueueTable, которая выполняется здесь.
removeUnnecessaryMessageQueue()
метод, то есть проверить, можно ли получить блокировку текущей очереди обработки потребления каждую 1 с, и вернуть true, если она получена. Если после ожидания в течение 1 с блокировка текущей очереди обработки потребления все еще не получена, вернуть false. Если он возвращает true, удалите соответствующую запись из переменной кэша processQueueTable; - Зеленая часть processQueueTable на приведенном выше рисунке представляет собой пересечение с назначенным набором очередей сообщений mqSet. Определите, истек ли срок действия ProcessQueue. Неважно, находится ли он в режиме извлечения. Если он находится в режиме отправки, установите для свойства Dropped значение true и вызовите
removeUnnecessaryMessageQueue()
метод, попробуйте удалить запись, как указано выше;
Балансировка нагрузки очередей потребления сообщений между разными потребителями в одной группе потребления,Его основная концепция дизайна заключается в том, что очередь потребления сообщений может использоваться только одним потребителем в одной и той же группе потребления одновременно, и потребитель сообщений может использовать несколько очередей сообщений одновременно..
Вышеупомянутая часть взята из документа docs в исходном коде RocketMQ. Я не знаю, понимаете ли вы это. Во всяком случае, я понял это только после того, как прочитал его несколько раз🤔🤔🤔
На самом деле, глядя на рисунок на шаге 3, реализация балансировки нагрузки ясна с первого взгляда.Проще говоря, это выделение одинакового количества очередей потребления для разных потребителей.. Потребители будут генерировать уникальный идентификатор для clientId, но, согласно нашим рассуждениям выше, в контейнере и в режиме сети хоста будет генерироваться непротиворечивый clientId.
Эммм... На данный момент каждый должен быть в состоянии догадаться, что пошло не так.
Верно! Проблема должна быть на шаге 3, как рассчитывается среднее распределение.
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 当前clientId所在的下标
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
Приведенный выше расчет может показаться немного запутанным, но на самом деле, после его понимания, он заключается в вычислении очереди сообщений, выделенной текущим Потребителем, как показано на рисунке в шаге 3 выше.
Предполагая, что в настоящее время есть только один потребитель, наше потребление на самом деле совершенно нормально, потому что все очереди под текущим топиком будут выделены текущему потребителю, и проблемы с балансировкой нагрузки нет.
Предполагая, что в настоящее время есть два потребителя, результат должен быть таким в соответствии с обычным методом расчета. Но потому чтоcidAll
являются двумя дубликатами clientId, поэтому индекс, полученный обоими потребителями, равен 0,Естественно, им выделяется один и тот же MessageQueue. Это может объяснить, почему вы видите логи с потреблением в начале, но скорость потребления очень низкая.
Решение
- Устранение ошибок балансировки нагрузки
Виновник: clientId
После хорошего вывода каждый должен знать, что основной причиной ошибки балансировки нагрузки Consumer является то, что clientId, сгенерированный клиентом Consumer, непротиворечив, поэтому ключом к решению этой проблемы является изменение правила генерации clientId. Выше кратко проанализированы правила генерации clientId из исходного кода, мы можем установить вручнуюrocketmq.client.name
Эта переменная среды создает пользовательский уникальный идентификатор clientId .
Fat Hao добавляет метку времени к исходному pid здесь:
@PostConstruct
public void init() {
System.setProperty("rocketmq.client.name", String.valueOf(UtilAll.getPid()) + "@" + System.currentTimeMillis());
}
- Разрешить накопление сообщений
Наконец-то решил корневую проблему! Хорошо, все готово, просто подключайтесь к сети, более 300 миллионов сообщений, накопленных в очереди, все еще ждут своего использования.
(Можно сказать, что это сиюминутное накопление и сиюминутная прохлада, и накапливалось все время 😭)
Вскоре после того, как он был запущен, эммм... эффект замечательный, и количество накопленных сообщений постепенно уменьшилось. Но пришло другое предупреждение, mongodb предупредил!
Держи траву. . . Чуть не забыл, что потребители будут писать в mongodb после обработки бизнес-сообщений.Теперь вход трафика потребления резко увеличился, и mongodb уже не может с этим справиться. К счастью, исторические новости не важны и могут быть потеряны. Поэтому Feihao решительно ушел в фон, чтобы сбросить точку потребления.Теперь, когда потребление в норме, mongodb тоже в норме. Уф~ Это был промах, и я чуть не спровоцировал еще одну аварию.
Суммировать
- Потребительский клиент RocketMQ будет генерировать уникальный идентификатор clientId, а правило генерации clientId:
客户端IP+客户端进程号
- Развертывание контейнера Docker Если в сетевом режиме используется режим хоста, приложения в контейнере получат IP-адрес моста Docker по умолчанию.
- Балансировка нагрузки RocketMQ на стороне потребителя реализована на стороне клиента. Сторона клиента-потребителя будет кэшировать соответствующую очередь потребления темы. По умолчанию используется средний алгоритм распределения очереди сообщений. Если clientId один и тот же, все клиенты будут размещены в той же очереди, что приводит к аномальному потреблению.
- Для обработки накопления сообщений необходимо провести всестороннюю проверку. Другие предприятия не могут быть затронуты мгновенным входом с высоким трафиком, иначе это вызовет еще одну аварию, например, толстую траншею (если у вас есть лучшее решение для накопления сообщений, оставьте сообщение для предложений)
Это первый раз, когда Feihao пишет статью об онлайн-аварии. Она может быть грубой во многих местах или деталях. Я надеюсь, что все друзья-обезьяны будут более внимательными и дадут больше предложений~
На самом деле, я пережил довольно много несчастных случаев в сети, но каждое резюме - это просто формальность. Надеюсь, что теперь я смогу разобраться в этом в виде статей. Во-первых, это поможет мне с моим будущим резюме и обзор, а также может предоставить каждому информацию.Больше опыта ямы.
Обычные изменения изменят обычные
Я домашний мальчик, молодой человек, который ведет себя сдержанно в Интернете.
Подписывайтесь на официальный аккаунт «Чжай Сяонянь», личный блог 📖edisonz.cn, читайте больше, делитесь статьями