Руководство по использованию RabbitMQ (5) Как обеспечить использование 99,99% сообщений в очереди?

RabbitMQ

1. Обзор предыдущей ситуации

Руководство по использованию RabbitMQ (1) Настройка установки среды RabbitMQ и пример Hello World

Руководство по использованию RabbitMQ (2) Управление пользователями RabbitMQ, управление ролями и настройки разрешений

Руководство по использованию RabbitMQ (3) Как обеспечить успешную отправку 99,99% сообщений?

Руководство по использованию RabbitMQ (4) Как гарантировать, что 99,99% сообщений не будут потеряны из-за сохраняемости?

До сих пор мы можем гарантировать, что сообщение успешно отправлено производителем на сервер RabbitMQ, и что сообщение не будет потеряно после ненормального состояния сервера RabbitMQ (перезагрузка, простои и т. д.) Возможно, вы считаете, что сообщение должно быть в безопасности сейчас? На самом деле это недостаточно безопасно, если не верите, то читайте дальше.

2. Резюме этой статьи

На самом деле, есть еще один сценарий для рассмотрения:Когда потребитель получает сообщение, бизнес-логика не была обработана, и потребитель вешает трубку, поэтому сообщение также теряется?, Например, когда пользователь размещает заказ, центр заказов отправляет сообщение в очередь в RabbitMQ, а центр поинтов получает это сообщение и намеревается добавить 20 баллов пользователю, разместившему заказ, но баллы не были успешно добавлен, и центр точек зависает сам по себе, вызывая проблемы с данными.

Так как же решить эту проблему?

Чтобы гарантировать, что сообщения будут успешно обработаны потребителями, RabbitMQ предоставляетподтверждение сообщения, В этой статье в основном объясняется, как использовать механизм подтверждения сообщений в RabbitMQ, чтобы убедиться, что сообщения успешно используются потребителями, и избежать потери сообщений, вызванной внезапным простоем потребителя.

3. Включить явный режим подтверждения

в блоге 1Руководство по использованию RabbitMQ (1) Настройка установки среды RabbitMQ и пример Hello World, наш код для запуска потребителя выглядит так:

// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

Дело в том, чтоchannel.basicConsume(QUEUE_NAME, true, consumer);Второй параметр метода, давайте сначала посмотрим на исходный код basicConsume():

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}

Параметр autoAck здесь относится к тому, подтверждается ли оно автоматически.Если для него установлено значение true, RabbitMQ автоматически установит отправленное сообщение как подтверждение, а затем удалит его из памяти (или с диска), независимо от того, получит ли потребитель сообщение и обработает ли его. это успешно; если установлено значение false, RabbitMQ будет ждать, пока потребитель явно ответит на сигнал подтверждения, прежде чем удалять из памяти (или с диска).

Рекомендуется установить для autoAck значение false, чтобы у потребителей было достаточно времени для обработки сообщений, не беспокоясь о потере сообщений, вызванной простоем потребителя во время обработки сообщений.

В этот момент сообщение в очереди разделено на две части:

  1. Ожидание доставки сообщения потребителю (часть Ready на рисунке ниже)
  2. Он был доставлен потребителю, но не получил подтверждающий сигнал от потребителя (Неподготовленная часть на рисунке ниже)

Если RabbitMQ не получил сигнал подтверждения от потребителя, а потребитель, получающий сообщение, был отключен, RabbitMQ организует повторную постановку сообщения в очередь и ожидание доставки следующему потребителю. быть первоначальным потребителем.

RabbitMQ не устанавливает время истечения срока действия для неподтвержденных сообщений.Единственным основанием для принятия решения о необходимости повторной доставки сообщения потребителям является то, было ли разорвано соединение потребителя, которое потребляет сообщение.Причина такого дизайна в том, что RabbitMQ позволяет потребителям для использования сообщения. Сообщения могут занимать много времени.

Для простоты понимания возьмем конкретный пример, и будем использовать DurableProducer выше со слов производителя:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 发送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

Затем создайте новый потребительский класс AckConsumer:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建队列消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

Сначала мы устанавливаем параметр autoAck в значение true, то есть автоматическое подтверждение, и намеренно пишем исключение при потреблении сообщений, затем запускаем клиент-производитель для записи сообщения в очередь, а затем запускаем клиент-потребитель и обнаруживаем, что сообщение не потребляется успешно, но исчез:

Затем мы устанавливаем для autoAck значение false:

channel.basicConsume(QUEUE_NAME, false, consumer);

Запустите клиент производителя еще раз, чтобы написать сообщение в очередь, а затем запустить клиент Потребителя. В это время, хотя потребительский клиент все еще имеет код исключения кода, сообщение все еще находится в очереди:

Затем мы удаляем код исключения в клиенте-потребителе, перезапускаем клиент-потребитель и обнаруживаем, что потребление сообщения прошло успешно, но сообщение не было подтверждено:

Вручную остановите клиент-потребитель и обнаружите, что сообщение снова находится в состоянии «Готово» и готово к повторной доставке:

Причина, по которой сообщение потребляется, но все еще находится в состоянии Unacked, заключается в том, что мы не добавили в код явный код подтверждения:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag можно рассматривать как номер сообщения, который представляет собой 64-битное целое число.

На данный момент запустите потребительский клиент и обнаружите, что сообщение успешно потребляется и удалена из очереди:

4. Исходный код и ссылка

Адрес источника:GitHub.com/Где находится Ухань/SPR…, добро пожаловать на скачивание.

«Практическое руководство RabbitMQ» Чжу Чжунхуа