RabbitMQ публикует и подписывает бой — реализует отложенную очередь повторных попыток

Java задняя часть PHP RabbitMQ

RabbitMQ — это очередь сообщений с открытым исходным кодом, разработанная на Erlang. В этой статье предполагается, что читатель имеет базовое представление о том, что такое RabbitMQ.Если вы не знаете, что это такое и для чего его можно использовать, рекомендуется начать с официального сайта.RabbitMQ TutorialsНачните с вводного руководства.

В этой статье объясняется, как использовать RabbitMQ для реализации отложенных повторных попыток и очередей неудачных сообщений для обеспечения надежного потребления сообщений. После сбоя потребления сообщение будет автоматически задержано и повторно доставлено. Когда будет достигнуто определенное количество повторных попыток, сообщение будет доставлено сообщение с ошибкой.Очередь, ожидание ручного вмешательства. Здесь я проведу вас шаг за шагом, чтобы реализовать компонент публикации и подписки с функцией повторной попытки в случае сбоя.После использования этого компонента публикация и подписка на сообщения могут быть реализованы очень просто.При развитии бизнеса разработчики бизнеса могут сосредоточить свои усилия, направленные на реализацию бизнес-логики, не тратя время на понимание некоторых сложных концепций RabbitMQ.

Эта статья будет пересматриваться и обновляться.GITHUBВверхПлан развития программистаProject, добро пожаловать в Star, больше интересного контента, пожалуйстаfollow me.

резюме

Мы реализуем следующие функции

  • В сочетании с режимом темы RabbitMQ и режимом рабочей очереди производитель генерирует сообщения, потребители подписываются по запросу, и после того, как сообщения доставлены в очередь потребителя, несколько рабочих процессов одновременно потребляют сообщения.
  • Объединение RabbitMQMessage TTLиDead Letter ExchangeРеализовать функцию отложенного повтора сообщения
  • После того, как сообщение достигнет максимального количества повторных попыток, оно будет доставлено в очередь отказов, а после ожидания ручного вмешательства для устранения ошибки снова будет добавлено в очередь для использования.

Конкретный процесс показан на рисунке ниже.

xxx

  1. Производитель публикует сообщение на основной бирже
  2. Основной Exchange распределяет сообщение в соответствующую очередь сообщений в соответствии с ключом маршрутизации.
  3. Рабочие процессы нескольких потребителей одновременно потребляют сообщения в очереди, поэтому они используют метод «конкуренции» для борьбы за потребление сообщений.
  4. После того, как сообщение использовано, независимо от успеха или неудачи, сообщение подтверждения потребления ACK должно быть возвращено в очередь, чтобы избежать повторной доставки, вызванной механизмом подтверждения потребления сообщения.В то же время, если сообщение обработано успешно, процесс будет конец, иначе он войдет в стадию повторной попытки
  5. Если количество повторов меньше установленного максимального количества повторов (3 раза), сообщение будет повторно доставлено в очередь повторов Retry Exchange.
  6. Очередь повторных попыток не требует, чтобы потребители подписывались напрямую. Она будет ждать, пока истечет действительное время сообщения, а затем повторно доставить сообщение на биржу недоставленных писем. Здесь мы устанавливаем ее в качестве основной биржи и повторно доставляем сообщение после задержки, так что потребительское сообщение может быть повторно использовано
  7. Если потребление выходит из строя более трех раз, считается, что сообщение не может быть обработано, и сообщение напрямую доставляется в очередь неудачных неудачных обменов.В это время приложение может активировать механизм сигнализации, чтобы уведомить соответствующие ответственные человек, чтобы справиться с этим.
  8. Дождавшись ручного вмешательства (устранения ошибки), повторно доставьте сообщение на основной Exchange, чтобы его можно было повторно использовать.

Техническая реализация

Linus Torvaldsсказал раньше

Talk is cheap. Show me the code

Я реализовал решения, описанные в этой статье, на Java и PHP соответственно.Читатели могут лучше понять, обратившись к коду и основным шагам в этой статье.

Создать обмен

Чтобы реализовать отложенные повторные попытки и хранение сообщений об ошибках, нам нужно создать три Exchange для обработки сообщений.

  • masterПервичная биржа, на которую публикуются сообщения при их публикации
  • master.retryПовторите попытку обмена, при сбое обработки сообщения (в течение 3 раз) сообщение повторно доставляется на обмен.
  • master.failedFailed Exchange: после более чем трех неудачных попыток сообщение доставляется на Exchange.

Все объявления Exchange (declare) должны использовать следующие параметры

параметр ценность инструкция
exchange - Название биржи
type topic Тип обмена
passive false Если Exchange уже существует, верните успех, иначе создайте
durable true Persistent storage Exchange, тут только персистентность самого Exchange, сообщения и очереди нужно указывать отдельно для их персистентности
no-wait false Этот метод должен ответить на подтверждение

Java-код

// 声明Exchange:主体,失败,重试
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);

PHP-код

// 普通交换机
$this->channel->exchange_declare('master', 'topic', false, true, false);
// 重试交换机
$this->channel->exchange_declare('master.retry', 'topic', false, true, false);
// 失败交换机
$this->channel->exchange_declare('master.failed', 'topic', false, true, false);

В интерфейсе управления RabbitMQ мы видим три созданных Exchange.

-w539

выпуск новостей

Когда сообщение опубликовано, используйтеbasic_publishметод со следующими параметрами

параметр ценность инструкция
message - Опубликованный объект сообщения
exchange master Биржа, на которой было опубликовано сообщение
routing-key - КЛЮЧ маршрутизации, используемый для идентификации типа сообщения.
mandatory false Принуждать ли маршрут, если указана эта опция, если сообщение не подписано, будет возвращена ошибка маршрута недоступна
immediate false Указывает, что делать, если сообщения не могут быть перенаправлены напрямую потребителям.

При публикации сообщения дляmessageобъект, содержимое которого рекомендуется использоватьзакодированная строка json, и сообщение должно идентифицировать следующие атрибуты

'delivery_mode'=> 2 // 1为非持久化,2为持久化

Java-код

channel.basicPublish(
    "master", 
    routingKey, 
    MessageProperties.PERSISTENT_BASIC, // delivery_mode
    message.getBytes()
);

PHP-код

$msg = new AMQPMessage($message->serialize(), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$this->channel->basic_publish($msg, 'master', $routingKey);

Подписка на новости

Реализация подписки на сообщения является относительно сложной и требует завершения объявления очереди и привязки очереди и Exchange.

Declare Queue

Для каждой службы, которая подписывается на сообщения, должна быть создана очередь, соответствующая службе, и очередь должна быть привязана к интересующим правилам маршрутизации.После этого, после того, как производитель сообщения доставляет сообщение в Exchange, сообщение будет к правилам маршрутизации в соответствующую очередь для потребления потребителями.

Службе потребителей необходимо объявить три очереди

  • [queue_name]Имя очереди в формате[服务名称]@订阅服务标识
  • [queue_name]@retryочередь повторных попыток
  • [queue_name]@failedочередь отказов

订阅服务标识Это идентификатор классификации собственной подписки клиента, такой как служба центра пользователей (имя службы ucenter), которая содержит две подписки: пользовательскую и корпоративную, где имена очередей двух подписокucenter@userиucenter@enterprise, соответствующая очередь повторных попытокucenter@user@retryиucenter@enterprise@retry.

При объявлении очередей правила спецификации параметров следующие:

параметр ценность инструкция
queue - имя очереди
passive false Если очередь не существует, она будет создана, а если она существует, то она будет выполнена напрямую.
durable true постоянство очереди
exclusive false Исключительно, если для этого параметра установлено значение true, очередь действительна только для текущего соединения и автоматически удаляется после разрыва соединения.
no-wait false Этот метод требует подтверждения ответа
auto-delete false Удалять ли автоматически, когда больше не используется

за@retryПовторная очередь, необходимо указать дополнительные параметры

'x-dead-letter-exchange' => 'master'
'x-message-ttl'          => 30 * 1000 // 重试时间设置为30s

Смысл двух полей заголовка здесь в том, что после задержки в очереди на 30 секунд сообщение повторно доставляется вx-dead-letter-exchangeна соответствующей бирже

Java-код

// 声明监听队列
channel.queueDeclare(
    queueName, // 队列名称
    true,      // durable
    false,     // exclusive
    false,     // autoDelete
    null       // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);

PHP-код

$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
    $retryQueueName, // 队列名称
    false,           // passive
    true,            // durable
    false,           // exclusive
    false,           // auto_delete
    false,           // nowait
    new AMQPTable([
        'x-dead-letter-exchange' => 'master',
        'x-message-ttl'          => 30 * 1000,
    ])
);

В интерфейсе управления RabbitMQ вы можете увидеть три созданные нами очереди в разделе Queues.

Глядя на детали очереди, мы можем видетьqueueName@retryРазница между очередью и двумя другими очередями

-w486

Bind Exchange & Queue

После создания очереди нужно привязать очередь к Exchange (bind), различные очереди должны быть привязаны к соответствующему Exchange, созданному ранее.

Queue Exchange
[queue_name] master
[queue_name]@retry master.retry
[queue_name]@failed master.failed

При привязке необходимо предоставить подписанный КЛЮЧ маршрутизации, который соответствует КЛЮЧУ маршрутизации при публикации сообщения.Разница в том, что подстановочные знаки можно использовать для подписки на несколько типов сообщений одновременно.

параметр ценность инструкция
queue - связанная очередь
exchange - Связанный обмен
routing-key - Правила маршрутизации сообщений с подпиской
no-wait false Этот метод требует подтверждения ответа

Java-код

// 绑定监听队列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName + "@failed", "master.failed", routingKey);
channel.queueBind(queueName + "@retry", "master.retry", routingKey);

PHP-код

$this->channel->queue_bind($queueName, 'master', $routingKey);
$this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey);
$this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);

В интерфейсе управления RabbitMQ мы можем видеть отношения привязки между очередью и Exchange и ключом маршрутизации.

-w361

-w405

-w399

Реализация потребления сообщений

использоватьbasic_consumeПри потреблении сообщений необходимо обращать внимание на следующие параметры

параметр ценность инструкция
queue - Имя очереди на потребление
consumer-tag - Идентификатор потребителя, оставьте поле пустым
no_local false Если это поле установлено, сервер не будет публиковать сообщение клиенту, который его опубликовал.
no_ack false Требуется ответ подтверждения потребления
exclusive false Эксклюзивный доступ, после установки только текущему потребителю разрешен доступ к очереди
nowait false Этот метод требует подтверждения ответа

При использовании сообщения потребитель должен получить количество раз, когда сообщение было использовано из сообщения, чтобы определить, не удается ли выполнить повторную попытку обработки сообщения или отправить его в очередь сбоев.

Java-код

protected Long getRetryCount(AMQP.BasicProperties properties) {
	Long retryCount = 0L;
	try {
		Map<String, Object> headers = properties.getHeaders();
		if (headers != null) {
			if (headers.containsKey("x-death")) {
				List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
				if (deaths.size() > 0) {
					Map<String, Object> death = deaths.get(0);
					retryCount = (Long) death.get("count");
				}
			}
		}
	} catch (Exception e) {}

	return retryCount;
}

PHP-код

protected function getRetryCount(AMQPMessage $msg): int
{
	$retry = 0;
	if ($msg->has('application_headers')) {
		$headers = $msg->get('application_headers')->getNativeData();
		if (isset($headers['x-death'][0]['count'])) {
			$retry = $headers['x-death'][0]['count'];
		}
	}

	return (int)$retry;
}

После того, как потребление сообщения завершено, необходимо отправить сообщение подтверждения потребления на сервер, используяbasic_ackметод

ack(delivery-tag=消息的delivery-tag标识)

Java-код

// 消息消费处理
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        ...
        // 注意,由于使用了basicConsume的autoAck特性,因此这里就不需要手动执行
        // channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
// 执行消息消费处理
channel.basicConsume(
    queueName, 
    true, // autoAck
    consumer
);

PHP-код

$this->channel->basic_consume(
    $queueName,
    '',    // customer_tag
    false, // no_local
    false, // no_ack
    false, // exclusive
    false, // nowait
    function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
        ...
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

Если при обработке сообщения возникает исключение, сообщение должно быть повторно доставлено на повторную биржу и дождаться следующей повторной попытки.

basic_publish(msg, 'master.retry', routing-key)
ack(delivery-tag) // 不要忘记了应答消费成功消息

Если установлено, что число повторных попыток превышает 3, а обработка по-прежнему завершается неудачно, сообщение следует доставить на отказавший Exchange и дождаться ручной обработки.

basic_publish(msg, 'master.failed', routing-key)
ack(delivery-tag) // 不要忘记了应答消费成功消息

Убедитесь, что вы не забыли сообщение подтверждения, потому что повторная попытка и сбой достигаются за счет повторной доставки сообщения на обмен повторных попыток и сбоев.Если вы забудете подтверждение, сообщение будет повторно доставлено потребителю после тайм-аута или соединения. отключен, иначе, если потребитель все еще не может с этим справиться, это вызовет бесконечный цикл.

Java-код

try {
    String message = new String(body, "UTF-8");
    // 消息处理函数
    handler.handle(message, envelope.getRoutingKey());

} catch (Exception e) {
    long retryCount = getRetryCount(properties);
    if (retryCount > 3) {
        // 重试次数大于3次,则自动加入到失败队列
        channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body);
    } else {
        // 重试次数小于3,则加入到重试队列,30s后再重试
        channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body);
    }
}

Повторить неудачные задачи

Если задача повторяется три раза и все равно не удается, она будет доставлена ​​в очередь сбоев.В это время вам необходимо вручную обработать исключение программы.После обработки сообщение необходимо повторно доставить в очередь для обработки. Единственное, что вам нужно сделать здесь, это подписаться на сообщение из очереди отказов, затем после получения сообщения очистить егоapplication_headersинформацию заголовка, а затем повторно доставить ее вmasterЭтот обмен будет делать.

Java-код

channel.basicPublish(
    'master', 
    envelope.getRoutingKey(),
    MessageProperties.PERSISTENT_BASIC,
    body
);

PHP-код

$msg->set('application_headers', new AMQPTable([]));
$this->channel->basic_publish(
    $msg,
    'master',
    $msg->get('routing_key')
);

как пользоваться

Мы закончили говорить об отношениях между очередью и Exchange, публикацией и подпиской, так каков же эффект от их использования? Здесь мы берем код Java в качестве примера

// 发布消息
Publisher publisher = new Publisher(factory.newConnection(), 'master');
publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create");

// 订阅消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
    .init("user-monitor", "user.*")
    .subscribe((message, routingKey) -> {
        // TODO 业务逻辑
        System.out.printf("    <%s> message consumed: %s\n", routingKey, message);
    }
);

Суммировать

При использовании RabbitMQ способ реализации отложенных повторных попыток и очереди отказов не ограничивается методами, описанными в этой статье.Если у читателей есть лучшее решение для реализации, добро пожаловать в кирпич.Здесь я просто бросаю кирпич. Метод, описанный в этой статье, еще имеет много возможностей для оптимизации, и читатели также могут попробовать улучшить его реализацию.Например, в этой статье используются три Exchagnes.Может ли только одна Exchange реализовать функции, описанные в этой статье.

Эта статья будет пересматриваться и обновляться.GITHUBВверхПлан развития программистаProject, добро пожаловать в Star, больше интересного контента, пожалуйстаfollow me.