предисловие
В тот день я вернулся в компанию, чтобы поработать сверхурочно после ужина с коллегами, и тут кто-то из группы @I сказал, что продавцы xxx сказали, что не могут получать push-уведомления, и я сначала подумал, что это ничего. Моей первой реакцией было то, что Jiguang не зарегистрирован, поэтому я попросил службу поддержки уведомить продавца и повторно войти в систему, чтобы попробовать. Откройте предысторию Jiguang, нажмите здесь, чтобы проверить. Позже все больше и больше ответов не поступало, и я знал, что это было непросто.
авария после
Поскольку большое количество продавцов ответили, что они не могут получить push-уведомления, моей первой реакцией было, не работает ли система push-уведомлений, что привело к отсутствию push-уведомлений. Так что пусть брат по эксплуатации и техническому обслуживанию проверит состояние каждого узла нажимной системы и найдет, что оно нормальное. Итак, я открыл консоль RabbitMQ и посмотрел, и все были в замешательстве. Уже есть десятки тысяч сообщений вready
Статус, есть еще сотниunacked
Новости.
Я подумал, что соединение между push-сервисом и MQ было разорвано, что сделало невозможным push-сообщения, поэтому я попросил службу эксплуатации и обслуживания перезапустить push-сервис, перезапустил все push-сервисы и обнаружил, чтоunacked
Все сообщения становятсяready
, но это не заняло много времени для сотенunacked
, очевидно, что его можно потреблять, а не проводитьack
Ах.
В то время я думал, что это была проблема с сетью, из-за которой mq не мог получитьack
, пусть брат по эксплуатации и техническому обслуживанию проверит его и обнаружил, что с сетью все в порядке. Это действительно глупо сейчас, есть проблема с сетью, и соединение не может быть подключено. Поскольку несомненно, чтоack
вызвало, немедленноack模式
по оригиналуmanual
изменить наauto
Срочный выпуск. После апгрейда всех нод обнаруживается, что пуш в норме.
Вы думали, что с этим покончено, но это не так. Вскоре выяснилось, что сервис MQ ненормальный.镜像队列
, немедленно удалите проблемный MQ из кластера. Просто сделайте сброс и снова присоединитесь к кластеру. Это дело окончено. Сейчас почти 24:00.
Когда время подошло к 10:00 следующего дня, на стороне эксплуатации и обслуживания снова появился сигнал тревоги, говорящий о том, что в системе выталкивания есть машина, диск почти заполнен, а коэффициент занятости высокий. Моя дорогая написала почти 40G логов со вчерашнего вечера до сегодняшнего дня, и когда я вижу сообщение об ошибке, я сразу понимаю, в чем проблема. скользкая ручкаbug
Исправлена аварийная разблокировка.
Я жаловался на волну ELK компании, но это сообщение об ошибке я вообще не собирал, поэтому вовремя не нашел.
Повторение инцидента - блокировка очереди
Конфигурация MQ
spring:
# 消息队列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次请求中预处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
код проблемы
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
вроде нормально. По соглашению с торговой системой данные ордера необходимо предварительно преобразовать.json
строка, а затем используйтеAES
Шифрование, поэтому здесь нужно сначала расшифровать, а затем разобрать. чтобы получить данные заказа.
Во избежание потери сообщений в торговой системе失败重发
Механизм предотвращения потери сообщения, к сожалению, данные заказа не шифруются при повторной отправке. Это приводит к тому, что во время расшифровки в системе push возникает исключение, что делает невозможным продолжение.ack
.
Тихо жаловался: Когда люди сидят дома, горшок с неба приходит.
Имитировать толчок
Отправить 3 обычных сообщения
curl http://localhost:8080/sendMsg/3
Отправить 1 неправильное сообщение
curl http://localhost:8080/sendErrorMsg/1
Отправить еще 3 обычных сообщения
curl http://localhost:8080/sendMsg/3
Журнал наблюдений отправлен.Хоть и есть ошибка,но все же можно нормально запушить. Но RabbitMQ уже появилсяunacked
Новости.
Продолжать отправлять 1 неправильное сообщение
curl http://localhost:8080/sendErrorMsg/1
Отправить еще 3 обычных сообщения
curl http://localhost:8080/sendMsg/3
В это время вы обнаружите, что консоль сообщает об ошибке.Конечно, сообщение об ошибке заключается в том, что расшифровка не удалась, но обычное сообщение не потребляется.В это время очередь фактически заблокирована.
отRabbitMQ
Консоль также может видеть, что три только что отправленных сообщения находятся вready
государство. В это время, если новости будут поступать постоянно, они будут накапливаться в команде и не могут быть употреблены.
Отправить еще 3 обычных сообщения
curl http://localhost:8080/sendMsg/3
Проанализируйте причины
Как упоминалось выше, это произошло потому, что не былоack
вызвать блокировку в очереди. Так вот вопрос, почему это? На самом деле этоRabbitMQ
защитный механизм. Предотвратить ввод большого количества сообщений при резком увеличении количества сообщений.consumer
вызванныйconsumer
время простоя.
RabbitMQ предоставляет функцию QOS (обеспечение качества обслуживания), которая ограничивает максимальное количество неподтвержденных сообщений, которые потребители на канале могут хранить на основании неподтвержденных автоматически сообщений. может быть установленPrefetchCount
выполнить.
Пример: это можно понять какconsumer
Буферный контейнер добавляется впереди, и максимальное количество сообщений, которое может содержать контейнер, равноPrefetchCount
. Если контейнер не полныйRabbitMQ
Сообщение будет доставлено в контейнер, и если он заполнен, то не будет доставлено. когдаconsumer
в сообщенииack
Это сообщение будет удалено в будущем, чтобы разместить новое сообщение.
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
Параметр предварительной выборки — PrefetchCount.
Обнаружено с помощью вышеуказанной конфигурацииprefetch
У меня только 2 настроены, иconcurrency
Настроено только 1, поэтому, когда я отправляю 2 сообщения об ошибках, эти 2 сообщения не были получены из-за сбоя дешифрования.ack
. Заполните буфер, на этот разRabbitMQ
подумай об этомconsumer
Если у него нет потребляемой мощности, он не будет продолжать отправлять ему сообщения, поэтому очередь блокируется.
Определите, существует ли угроза блокировки очереди.
когдаack
режимmanual
и появился в сетиunacked
Новости, не паникуйте в это время. Поскольку QOS является ограниченным каналомchannel
Максимальное количество неподтвержденных товаров, которые потребитель использует . так что позвольтеunacked
Количество может быть передано черезchannelCount * prefetchCount * 节点数量
предполагаемый.
channlCount
отconcurrency
,max-concurrency
решил.
-
min
=concurrency * prefetch * 节点数量
-
max
=max-concurrency * prefetch * 节点数量
Из этого можно сделать вывод
-
unacked_msg_count
<min
Очередь не блокируется. но требует своевременного леченияunacked
Новости. -
unacked_msg_count
>=min
Может произойти блокировка. -
unacked_msg_count
>=max
Очередь должна быть заблокирована.
Тут нужно хорошо разбираться.
Подход
На самом деле способ обработки очень простой, методы расшифровки и разбора занесите вtry catch
Решается таким образом, что независимо от того, будет ли расшифровка нормальной или нет, сообщение будет подписано. Если есть ошибка, журнал ошибок будет выведен на рассмотрение разработчика.
Для этого нужна система мониторинга логов, чтобы вовремя оповещать.
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
Примечания
unacked
новости вconsumer
После отключения (перезапуска) он автоматически вернется в начало очереди.
Авария повторяется - использование диска стремительно растет
Сначала я не знал, что с кодом что-то не так, просто думал, что это просто не сделаноack
Так будетack
режим изменен наauto
Автоматически, срочно обновил, так что сообщение будет подписано независимо от того, нормальное оно или нет, так что проблема действительно была решена на тот момент.
На самом деле, оглядываясь назад, это была очень опасная операция.ack
режим изменен наauto
Автоматически это сделает QOS неэффективным. Будет поток новостейconsumer
в результате чегоconsumer
Время простоя может быть связано с тем, что ночью было мало транзакций, а в push-системе было несколько узлов, так что проблем не было.
код проблемы
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
конфигурационный файл
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: auto
Поскольку механизм ретрансляции торговой системы в то время был неизвестен, и не было ошибки в шифровании данных при ретрансляции, небольшое количество некорректных сообщений все же было отправлено.
Отправить 1 неправильное сообщение
curl http://localhost:8080/sendErrorMsg/1
причина
RabbitMQ
Когда прослушиватель сообщений ненормальный,consumer
будетrabbitmq server
ОтправитьBasic.Reject
, указывая, что сообщение отклонено, потому чтоSpring
дефолтrequeue-rejected
настроен какtrue
, сообщение повторно помещается в очередь, затемrabbitmq server
повторная доставка. Это эквивалентно бесконечному циклу, поэтому консоль лихорадочно очищает журнал ошибок, вызывая резкое увеличение использования диска.
Решение
будетdefault-requeue-rejected: false
Вот и все.
Суммировать
- Лично не рекомендуется использовать автоматическое подтверждение в производственной среде, иначе QOS не вступит в силу.
- При использовании ручных подтверждений нужно быть очень осторожным с получением сообщения.
- На самом деле, при сбросе проблемного MQ не проблема удалить неправильное сообщение, что равносильно потере сообщения.
try {
// 业务逻辑。
}catch (Exception e){
// 输出错误日志。
}finally {
// 消息签收。
}
использованная литература
кодовый адрес
https://gitee.com/huangxunhui/rabbitmq_accdient.git
конец
Не паникуйте, если кто-то скажет вам столкнуться с онлайн-аварией, если только это не супербосс, побывавший в битвах. В противном случае это чушь собачья, ты даешь ему попробовать и смотришь, не помутится ли его разум и не вспотеет.
Если вы считаете, что это полезно для вас, вы можете прокомментировать и поставить лайк, или вы можете перейти на мою домашнюю страницу, чтобы увидеть, может быть, есть статья, которая вам нравится, вы также можете просто подписаться на нее, спасибо.