Обработка блокировки онлайн-очереди RabbitMQ

RabbitMQ

предисловие

   В тот день я вернулся в компанию, чтобы поработать сверхурочно после ужина с коллегами, и тут кто-то из группы @I сказал, что продавцы xxx сказали, что не могут получать push-уведомления, и я сначала подумал, что это ничего. Моей первой реакцией было то, что Jiguang не зарегистрирован, поэтому я попросил службу поддержки уведомить продавца и повторно войти в систему, чтобы попробовать. Откройте предысторию Jiguang, нажмите здесь, чтобы проверить. Позже все больше и больше ответов не поступало, и я знал, что это было непросто.

авария после

   Поскольку большое количество продавцов ответили, что они не могут получить push-уведомления, моей первой реакцией было, не работает ли система push-уведомлений, что привело к отсутствию push-уведомлений. Так что пусть брат по эксплуатации и техническому обслуживанию проверит состояние каждого узла нажимной системы и найдет, что оно нормальное. Итак, я открыл консоль RabbitMQ и посмотрел, и все были в замешательстве. Уже есть десятки тысяч сообщений вreadyСтатус, есть еще сотниunackedНовости.

   Я подумал, что соединение между push-сервисом и MQ было разорвано, что сделало невозможным push-сообщения, поэтому я попросил службу эксплуатации и обслуживания перезапустить push-сервис, перезапустил все push-сервисы и обнаружил, чтоunackedВсе сообщения становятсяready, но это не заняло много времени для сотенunacked, очевидно, что его можно потреблять, а не проводитьackАх.

   В то время я думал, что это была проблема с сетью, из-за которой mq не мог получитьack, пусть брат по эксплуатации и техническому обслуживанию проверит его и обнаружил, что с сетью все в порядке. Это действительно глупо сейчас, есть проблема с сетью, и соединение не может быть подключено. Поскольку несомненно, чтоackвызвало, немедленноack模式по оригиналуmanualизменить наautoСрочный выпуск. После апгрейда всех нод обнаруживается, что пуш в норме.

  Вы думали, что с этим покончено, но это не так. Вскоре выяснилось, что сервис MQ ненормальный.镜像队列, немедленно удалите проблемный MQ из кластера. Просто сделайте сброс и снова присоединитесь к кластеру. Это дело окончено. Сейчас почти 24:00.

   Когда время подошло к 10:00 следующего дня, на стороне эксплуатации и обслуживания снова появился сигнал тревоги, говорящий о том, что в системе выталкивания есть машина, диск почти заполнен, а коэффициент занятости высокий. Моя дорогая написала почти 40G логов со вчерашнего вечера до сегодняшнего дня, и когда я вижу сообщение об ошибке, я сразу понимаю, в чем проблема. скользкая ручкаbugИсправлена ​​аварийная разблокировка.

Я жаловался на волну ELK компании, но это сообщение об ошибке я вообще не собирал, поэтому вовремя не нашел.

Повторение инцидента - блокировка очереди

Конфигурация MQ

spring:
  # 消息队列
  rabbitmq:
    host: 10.0.0.53
    username: guest
    password: guest
    virtual-host: local
    port: 5672
    # 消息发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    listener:
      simple:
        # 消费端最小并发数
        concurrency: 1
        # 消费端最大并发数
        max-concurrency: 5
        # 一次请求中预处理的消息数量
        prefetch: 2
        # 手动应答
        acknowledge-mode: manual

код проблемы

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

    try {
        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

   вроде нормально. По соглашению с торговой системой данные ордера необходимо предварительно преобразовать.jsonстрока, а затем используйтеAESШифрование, поэтому здесь нужно сначала расшифровать, а затем разобрать. чтобы получить данные заказа.

   Во избежание потери сообщений в торговой системе失败重发Механизм предотвращения потери сообщения, к сожалению, данные заказа не шифруются при повторной отправке. Это приводит к тому, что во время расшифровки в системе push возникает исключение, что делает невозможным продолжение.ack.

Тихо жаловался: Когда люди сидят дома, горшок с неба приходит.

Имитировать толчок

push-код

Отправить 3 обычных сообщения

curl http://localhost:8080/sendMsg/3

Отправить 1 неправильное сообщение

curl http://localhost:8080/sendErrorMsg/1

Отправить еще 3 обычных сообщения

curl http://localhost:8080/sendMsg/3

  Журнал наблюдений отправлен.Хоть и есть ошибка,но все же можно нормально запушить. Но RabbitMQ уже появилсяunackedНовости.

Продолжать отправлять 1 неправильное сообщение

curl http://localhost:8080/sendErrorMsg/1

Отправить еще 3 обычных сообщения

curl http://localhost:8080/sendMsg/3

   В это время вы обнаружите, что консоль сообщает об ошибке.Конечно, сообщение об ошибке заключается в том, что расшифровка не удалась, но обычное сообщение не потребляется.В это время очередь фактически заблокирована.

отRabbitMQКонсоль также может видеть, что три только что отправленных сообщения находятся вreadyгосударство. В это время, если новости будут поступать постоянно, они будут накапливаться в команде и не могут быть употреблены.

Отправить еще 3 обычных сообщения

curl http://localhost:8080/sendMsg/3

Проанализируйте причины

   Как упоминалось выше, это произошло потому, что не былоackвызвать блокировку в очереди. Так вот вопрос, почему это? На самом деле этоRabbitMQзащитный механизм. Предотвратить ввод большого количества сообщений при резком увеличении количества сообщений.consumerвызванныйconsumerвремя простоя.

  RabbitMQ предоставляет функцию QOS (обеспечение качества обслуживания), которая ограничивает максимальное количество неподтвержденных сообщений, которые потребители на канале могут хранить на основании неподтвержденных автоматически сообщений. может быть установленPrefetchCountвыполнить.

  Пример: это можно понять какconsumerБуферный контейнер добавляется впереди, и максимальное количество сообщений, которое может содержать контейнер, равноPrefetchCount. Если контейнер не полныйRabbitMQСообщение будет доставлено в контейнер, и если он заполнен, то не будет доставлено. когдаconsumerв сообщенииackЭто сообщение будет удалено в будущем, чтобы разместить новое сообщение.

listener:
  simple:
    # 消费端最小并发数
    concurrency: 1
    # 消费端最大并发数
    max-concurrency: 5
    # 一次处理的消息数量
    prefetch: 2
    # 手动应答
    acknowledge-mode: manual

Параметр предварительной выборки — PrefetchCount.

   Обнаружено с помощью вышеуказанной конфигурацииprefetchУ меня только 2 настроены, иconcurrencyНастроено только 1, поэтому, когда я отправляю 2 сообщения об ошибках, эти 2 сообщения не были получены из-за сбоя дешифрования.ack. Заполните буфер, на этот разRabbitMQподумай об этомconsumerЕсли у него нет потребляемой мощности, он не будет продолжать отправлять ему сообщения, поэтому очередь блокируется.

Определите, существует ли угроза блокировки очереди.

когдаackрежимmanualи появился в сетиunackedНовости, не паникуйте в это время. Поскольку QOS является ограниченным каналомchannelМаксимальное количество неподтвержденных товаров, которые потребитель использует . так что позвольтеunackedКоличество может быть передано черезchannelCount * prefetchCount * 节点数量предполагаемый.

channlCountотconcurrency,max-concurrencyрешил.

  • min = concurrency * prefetch * 节点数量
  • max = max-concurrency * prefetch * 节点数量

Из этого можно сделать вывод

  • unacked_msg_count < minОчередь не блокируется. но требует своевременного леченияunackedНовости.
  • unacked_msg_count >= minМожет произойти блокировка.
  • unacked_msg_count >= maxОчередь должна быть заблокирована.

Тут нужно хорошо разбираться.

Подход

   На самом деле способ обработки очень простой, методы расшифровки и разбора занесите вtry catchРешается таким образом, что независимо от того, будет ли расшифровка нормальной или нет, сообщение будет подписано. Если есть ошибка, журнал ошибок будет выведен на рассмотрение разработчика.

Для этого нужна система мониторинга логов, чтобы вовремя оповещать.

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    try {

        // 解密和解析
        String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
        OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

Примечания

  unackedновости вconsumerПосле отключения (перезапуска) он автоматически вернется в начало очереди.

Авария повторяется - использование диска стремительно растет

   Сначала я не знал, что с кодом что-то не так, просто думал, что это просто не сделаноackТак будетackрежим изменен наautoАвтоматически, срочно обновил, так что сообщение будет подписано независимо от того, нормальное оно или нет, так что проблема действительно была решена на тот момент.

   На самом деле, оглядываясь назад, это была очень опасная операция.ackрежим изменен наautoАвтоматически это сделает QOS неэффективным. Будет поток новостейconsumerв результате чегоconsumerВремя простоя может быть связано с тем, что ночью было мало транзакций, а в push-системе было несколько узлов, так что проблем не было.

код проблемы

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

    try {

        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

конфигурационный файл

listener:
  simple:
    # 消费端最小并发数
    concurrency: 1
    # 消费端最大并发数
    max-concurrency: 5
    # 一次处理的消息数量
    prefetch: 2
    # 手动应答
    acknowledge-mode: auto

  Поскольку механизм ретрансляции торговой системы в то время был неизвестен, и не было ошибки в шифровании данных при ретрансляции, небольшое количество некорректных сообщений все же было отправлено.

Отправить 1 неправильное сообщение

curl http://localhost:8080/sendErrorMsg/1

причина

  RabbitMQКогда прослушиватель сообщений ненормальный,consumerбудетrabbitmq serverОтправитьBasic.Reject, указывая, что сообщение отклонено, потому чтоSpringдефолтrequeue-rejectedнастроен какtrue, сообщение повторно помещается в очередь, затемrabbitmq serverповторная доставка. Это эквивалентно бесконечному циклу, поэтому консоль лихорадочно очищает журнал ошибок, вызывая резкое увеличение использования диска.

Решение

будетdefault-requeue-rejected: falseВот и все.

Суммировать

  • Лично не рекомендуется использовать автоматическое подтверждение в производственной среде, иначе QOS не вступит в силу.
  • При использовании ручных подтверждений нужно быть очень осторожным с получением сообщения.
  • На самом деле, при сбросе проблемного MQ не проблема удалить неправильное сообщение, что равносильно потере сообщения.
try {
    // 业务逻辑。
}catch (Exception e){
    // 输出错误日志。
}finally {
    // 消息签收。
}

использованная литература

кодовый адрес

https://gitee.com/huangxunhui/rabbitmq_accdient.git

конец

   Не паникуйте, если кто-то скажет вам столкнуться с онлайн-аварией, если только это не супербосс, побывавший в битвах. В противном случае это чушь собачья, ты даешь ему попробовать и смотришь, не помутится ли его разум и не вспотеет.

  Если вы считаете, что это полезно для вас, вы можете прокомментировать и поставить лайк, или вы можете перейти на мою домашнюю страницу, чтобы увидеть, может быть, есть статья, которая вам нравится, вы также можете просто подписаться на нее, спасибо.