Ямы, внесенные rabbitmq, чтобы гарантировать, что сообщения не будут потеряны, еще слишком молоды.

задняя часть Архитектура программист
Ямы, внесенные rabbitmq, чтобы гарантировать, что сообщения не будут потеряны, еще слишком молоды.

«Это второй день моего участия в первом испытании обновления 2022 года. Подробности о мероприятии см.: Первое испытание обновления 2022 года».

предисловие

  • Ранее мы упоминали, как гарантировать, что сообщения rabbitmq не будут потеряны. анализируется с трех точек зрения. Это отправитель, rabbitmq и потребитель.
  • В то время заинтересованные потребители лишь кратко представили их. Сегодня мы анализируем питы, принесенные потребителями, подтверждающие потребление из сценария использования.

Отправить сообщение

  • Здесь мы продолжаем использовать предыдущую логику отправки.
 public Map<String, Object> sendMessage(Map<String, Object> params) throws UnsupportedEncodingException {
     Map<String, Object> resultMap = new HashMap<String, Object>(){
         {
             put("code", 200);
         }
     };
     String msg = "";
     Integer index = 0;
     if (params.containsKey("msg")) {
         msg = params.get("msg").toString();
     }
     if (params.containsKey("index")) {
         index = Integer.valueOf(params.get("index").toString());
     }
     if (index != 0) {
         //这里开始模拟异常出现。消息将会丢失
         int i = 1 / 0;
     }
     Map<String, Object> map = new HashMap<>();
     map.put("msg", msg);
     Message message= MessageBuilder.withBody(JSON.toJSONString(map).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON)
             .build();
     CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
     rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", message,data);
     return resultMap;
 }
  • Прежде всего, мы будем держать исключения при отправке, это для проверки работы подтверждения отправки сообщения перед этим. Тут автор поленился и не удалил. На этот раз мы всегда будем обеспечивать точность доставки сообщения при вызове интерфейса. Потому что мы ориентируемся на потребителей
  • Поскольку у rabbitmq есть три механизма подтвержденияacknowledge-mode; ручной, автоматический, нет; ручной означает, что нам нужно подтвердить вручную, автоматический отмечает сообщение об автоматическом подтверждении, нет означает отсутствие действий
 @RabbitListener(queues = RabbitConfig.QUEUEFIRST)
 @Async("asyncExecutor")
 public void handler(Message msg, Channel channel) {
     //channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
     byte[] body = msg.getBody();
     String messages = new String(body);
     JSONObject json = (JSONObject) JSONObject.parse(messages);
     if ("1".equals(json.getString("msg"))) {
         try {
             channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
     if ("2".equals(json.getString("msg"))) {
         throw new RuntimeException("异常。。。。。");
     }
     log.info(RabbitConfig.QUEUEFIRST+"队列中消费的信息:"+msg);
 }
  • При получении сообщения мы обрабатываем его в соответствии с отправленным сообщением.Когда тело сообщения равно 1, мы подтвердим сообщение.Если тело сообщения равно 2, будет выброшено исключение, то есть никакого подтверждения сообщения выполнено не будет. Пока наше сообщение не подтверждено, rabbitmq сохранит сообщение и попытается снова отправить его потребителю.

описание сцены

  • Среди ночи мне вдруг позвонили и сказали, что онлайн-данные другие. Мы говорили о потреблении mq выше, умные читатели должны знать, что эта проблема должна заключаться в потреблении mq. А мне сначала было скучно. Проект находится в сети уже три дня, почему в это время функция работает ненормально?

  • Итак, я открыл проект и начал онлайн-позиционирование.Я сначала посмотрел лог и обнаружил, что логика обработки проекта mq сообщает о сумасшедших ошибках. Мое сердце начало тайно радоваться, что так легко удалось найти проблему. Однако по мере усугубления проблемы выясняется, что ошибка не является основной причиной онлайн-сбоя. Потому что феномен онлайн заключается в том, что данные не могут быть синхронизированы. Ключом к синхронизации является отслеживание сообщений mq для достижения синхронизации. Но проблема сейчас в том, что сообщение вообще не может быть получено через лог, а при входе на фоновую страницу mq видно, что все связанные очереди там заблокированы.

  • Здесь мы кратко резюмируем:

    • сумасшедшая ошибка логики обработки mq
    • mq не может принимать другие данные

Анализ проблемы

  • Поскольку это вызвано тремя днями после выхода в интернет, я уверен, что бизнес-логика в норме, иначе он вообще не пройдет тест. Итак, о чем я должен думать, так это о том, почему пара очередей заблокирована? На этот раз запуск внес небольшое изменение в mq, что увеличило ручное подтверждение сообщения.
 Unexpected exception occurred invoking async method: public void xxxxxxxxxxxxxxxx(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)
 java.lang.IllegalStateException: Channel closed; cannot ack/nack
  • Это один из онлайн-отчетов об ошибках.Еще одна ошибка в том, что ошибка в бизнесе не имеет к нам никакого отношения. Из приведенного выше сообщения об ошибке мы можем извлечь следующее

    • async exception
    • channel cloesed , cannot ack/nack
  • Во-первых, при асинхронной обработке возникает исключение, что приводит к асинхронному исключению; из-за ошибки асинхронной обработки при достижении подтверждения не появится подтверждение.

  • Вот решение блоггера для not ack,Не применимо для нашего сценария

  • Исключение возникает, когда мы отправляем msg=2. На самом деле с этим очень легко справиться.Создать совместимый процесс, в котором mq принимает сообщения, означает глобально перехватывать исключения.

  • Здесь мы как раз имеем дело с первой проблемой выше - mq обрабатывает сумасшедшие ошибки; очевидно, это не решает нашу фундаментальную проблему - почему сообщения mq перегружаются.

пакетные настройки mq

В случае включения подтверждения потребитель может асинхронно подтвердить полученное сообщение в соответствии с потребностями бизнеса.

Однако в процессе фактического использования из-за ограниченных возможностей обработки потребителя после получения определенного количества сообщений от rabbitmq есть надежда, что RabbitMQ больше не будет помещать сообщения в очередь и сможет обрабатывать больше сообщений. сообщения) перед получением сообщений из очереди. В этом сценарии мы можем добиться этого эффекта, установив prefetch_count в сигнале basic.qos.

image-20211025173519681.png

  • Мы видим, что значение prefetch_count по умолчанию для mq равно 250. На этот раз этот параметр также является основной причиной перегрузки mq.
  • То есть, когда мы сталкиваемся с сообщением об ошибке и нет действительного подтверждения сообщения, канал между нашим потребителем и mq будет занимать сообщение. Зная, что 250 фрагментов данных заполнены неподтвержденными сообщениями, обычные данные не будут доставлены потребителю. Потому что prefetch_count — это верхний предел для потребителей.
  • Это создает бесконечный цикл. 250 сообщений, которые мы не можем подтвердить, не могут быть использованы, и мы не можем получить новые.
  • Вот почему нет проблем при первом запуске системы, потому что вначале мы можем принять, что сообщение не подтверждено, не влияя на то, чтобы мы обрабатывали правильные данные, но эффективность немного медленнее. Медленно все больше и больше ошибочных данных с течением времени приводит к нашей возможной перегрузке.

Решение

  • Избавьтесь от ненормального бизнеса и перехватите исключение, чтобы обеспечить подтверждение сообщения
  • Для подтверждения сообщения лучше всего использовать автоматическое подтверждение или сначала поставить подтверждение сообщения.

Суммировать

  • Когда RabbitMQ хочет доставить сообщение в очереди получателю, он просматривает список получателей в очереди, выбирает подходящего получателя и затем доставляет сообщение. Одним из оснований для выбора потребителей является проверка того, достигает ли количество неподтвержденных сообщений на канале, соответствующем потребителю, установленного числа prefetch_count.Если количество неподтвержденных сообщений достигает числа prefetch_count, это не соответствует требованиям. Когда подходящий потребитель выбран, последующий обход прерывается