RabbitMQ — это очередь сообщений с открытым исходным кодом, разработанная на Erlang. В этой статье предполагается, что читатель имеет базовое представление о том, что такое RabbitMQ.Если вы не знаете, что это такое и для чего его можно использовать, рекомендуется начать с официального сайта.RabbitMQ TutorialsНачните с вводного руководства.
В этой статье объясняется, как использовать RabbitMQ для реализации отложенных повторных попыток и очередей неудачных сообщений для обеспечения надежного потребления сообщений. После сбоя потребления сообщение будет автоматически задержано и повторно доставлено. Когда будет достигнуто определенное количество повторных попыток, сообщение будет доставлено сообщение с ошибкой.Очередь, ожидание ручного вмешательства. Здесь я проведу вас шаг за шагом, чтобы реализовать компонент публикации и подписки с функцией повторной попытки в случае сбоя.После использования этого компонента публикация и подписка на сообщения могут быть реализованы очень просто.При развитии бизнеса разработчики бизнеса могут сосредоточить свои усилия, направленные на реализацию бизнес-логики, не тратя время на понимание некоторых сложных концепций RabbitMQ.
Эта статья будет пересматриваться и обновляться.GITHUBВверхПлан развития программистаProject, добро пожаловать в Star, больше интересного контента, пожалуйстаfollow me.
резюме
Мы реализуем следующие функции
- В сочетании с режимом темы RabbitMQ и режимом рабочей очереди производитель генерирует сообщения, потребители подписываются по запросу, и после того, как сообщения доставлены в очередь потребителя, несколько рабочих процессов одновременно потребляют сообщения.
- Объединение RabbitMQMessage TTLиDead Letter ExchangeРеализовать функцию отложенного повтора сообщения
- После того, как сообщение достигнет максимального количества повторных попыток, оно будет доставлено в очередь отказов, а после ожидания ручного вмешательства для устранения ошибки снова будет добавлено в очередь для использования.
Конкретный процесс показан на рисунке ниже.
- Производитель публикует сообщение на основной бирже
- Основной Exchange распределяет сообщение в соответствующую очередь сообщений в соответствии с ключом маршрутизации.
- Рабочие процессы нескольких потребителей одновременно потребляют сообщения в очереди, поэтому они используют метод «конкуренции» для борьбы за потребление сообщений.
- После того, как сообщение использовано, независимо от успеха или неудачи, сообщение подтверждения потребления ACK должно быть возвращено в очередь, чтобы избежать повторной доставки, вызванной механизмом подтверждения потребления сообщения.В то же время, если сообщение обработано успешно, процесс будет конец, иначе он войдет в стадию повторной попытки
- Если количество повторов меньше установленного максимального количества повторов (3 раза), сообщение будет повторно доставлено в очередь повторов Retry Exchange.
- Очередь повторных попыток не требует, чтобы потребители подписывались напрямую. Она будет ждать, пока истечет действительное время сообщения, а затем повторно доставить сообщение на биржу недоставленных писем. Здесь мы устанавливаем ее в качестве основной биржи и повторно доставляем сообщение после задержки, так что потребительское сообщение может быть повторно использовано
- Если потребление выходит из строя более трех раз, считается, что сообщение не может быть обработано, и сообщение напрямую доставляется в очередь неудачных неудачных обменов.В это время приложение может активировать механизм сигнализации, чтобы уведомить соответствующие ответственные человек, чтобы справиться с этим.
- Дождавшись ручного вмешательства (устранения ошибки), повторно доставьте сообщение на основной Exchange, чтобы его можно было повторно использовать.
Техническая реализация
Linus Torvaldsсказал раньше
Talk is cheap. Show me the code
Я реализовал решения, описанные в этой статье, на Java и PHP соответственно.Читатели могут лучше понять, обратившись к коду и основным шагам в этой статье.
Создать обмен
Чтобы реализовать отложенные повторные попытки и хранение сообщений об ошибках, нам нужно создать три Exchange для обработки сообщений.
- masterПервичная биржа, на которую публикуются сообщения при их публикации
- master.retryПовторите попытку обмена, при сбое обработки сообщения (в течение 3 раз) сообщение повторно доставляется на обмен.
- master.failedFailed Exchange: после более чем трех неудачных попыток сообщение доставляется на Exchange.
Все объявления Exchange (declare) должны использовать следующие параметры
параметр | ценность | инструкция |
---|---|---|
exchange | - | Название биржи |
type | topic | Тип обмена |
passive | false | Если Exchange уже существует, верните успех, иначе создайте |
durable | true | Persistent storage Exchange, тут только персистентность самого Exchange, сообщения и очереди нужно указывать отдельно для их персистентности |
no-wait | false | Этот метод должен ответить на подтверждение |
Java-код
// 声明Exchange:主体,失败,重试
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);
PHP-код
// 普通交换机
$this->channel->exchange_declare('master', 'topic', false, true, false);
// 重试交换机
$this->channel->exchange_declare('master.retry', 'topic', false, true, false);
// 失败交换机
$this->channel->exchange_declare('master.failed', 'topic', false, true, false);
В интерфейсе управления RabbitMQ мы видим три созданных Exchange.
выпуск новостей
Когда сообщение опубликовано, используйтеbasic_publish
метод со следующими параметрами
параметр | ценность | инструкция |
---|---|---|
message | - | Опубликованный объект сообщения |
exchange | master | Биржа, на которой было опубликовано сообщение |
routing-key | - | КЛЮЧ маршрутизации, используемый для идентификации типа сообщения. |
mandatory | false | Принуждать ли маршрут, если указана эта опция, если сообщение не подписано, будет возвращена ошибка маршрута недоступна |
immediate | false | Указывает, что делать, если сообщения не могут быть перенаправлены напрямую потребителям. |
При публикации сообщения дляmessage
объект, содержимое которого рекомендуется использоватьзакодированная строка json, и сообщение должно идентифицировать следующие атрибуты
'delivery_mode'=> 2 // 1为非持久化,2为持久化
Java-код
channel.basicPublish(
"master",
routingKey,
MessageProperties.PERSISTENT_BASIC, // delivery_mode
message.getBytes()
);
PHP-код
$msg = new AMQPMessage($message->serialize(), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
$this->channel->basic_publish($msg, 'master', $routingKey);
Подписка на новости
Реализация подписки на сообщения является относительно сложной и требует завершения объявления очереди и привязки очереди и Exchange.
Declare Queue
Для каждой службы, которая подписывается на сообщения, должна быть создана очередь, соответствующая службе, и очередь должна быть привязана к интересующим правилам маршрутизации.После этого, после того, как производитель сообщения доставляет сообщение в Exchange, сообщение будет к правилам маршрутизации в соответствующую очередь для потребления потребителями.
Службе потребителей необходимо объявить три очереди
-
[queue_name]
Имя очереди в формате[服务名称]@订阅服务标识
-
[queue_name]@retry
очередь повторных попыток -
[queue_name]@failed
очередь отказов
订阅服务标识
Это идентификатор классификации собственной подписки клиента, такой как служба центра пользователей (имя службы ucenter), которая содержит две подписки: пользовательскую и корпоративную, где имена очередей двух подписокucenter@user
иucenter@enterprise
, соответствующая очередь повторных попытокucenter@user@retry
иucenter@enterprise@retry
.
При объявлении очередей правила спецификации параметров следующие:
параметр | ценность | инструкция |
---|---|---|
queue | - | имя очереди |
passive | false | Если очередь не существует, она будет создана, а если она существует, то она будет выполнена напрямую. |
durable | true | постоянство очереди |
exclusive | false | Исключительно, если для этого параметра установлено значение true, очередь действительна только для текущего соединения и автоматически удаляется после разрыва соединения. |
no-wait | false | Этот метод требует подтверждения ответа |
auto-delete | false | Удалять ли автоматически, когда больше не используется |
за@retry
Повторная очередь, необходимо указать дополнительные параметры
'x-dead-letter-exchange' => 'master'
'x-message-ttl' => 30 * 1000 // 重试时间设置为30s
Смысл двух полей заголовка здесь в том, что после задержки в очереди на 30 секунд сообщение повторно доставляется в
x-dead-letter-exchange
на соответствующей бирже
Java-код
// 声明监听队列
channel.queueDeclare(
queueName, // 队列名称
true, // durable
false, // exclusive
false, // autoDelete
null // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);
PHP-код
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
$retryQueueName, // 队列名称
false, // passive
true, // durable
false, // exclusive
false, // auto_delete
false, // nowait
new AMQPTable([
'x-dead-letter-exchange' => 'master',
'x-message-ttl' => 30 * 1000,
])
);
В интерфейсе управления RabbitMQ вы можете увидеть три созданные нами очереди в разделе Queues.
Глядя на детали очереди, мы можем видетьqueueName@retryРазница между очередью и двумя другими очередями
Bind Exchange & Queue
После создания очереди нужно привязать очередь к Exchange (bind
), различные очереди должны быть привязаны к соответствующему Exchange, созданному ранее.
Queue | Exchange |
---|---|
[queue_name] | master |
[queue_name]@retry | master.retry |
[queue_name]@failed | master.failed |
При привязке необходимо предоставить подписанный КЛЮЧ маршрутизации, который соответствует КЛЮЧУ маршрутизации при публикации сообщения.Разница в том, что подстановочные знаки можно использовать для подписки на несколько типов сообщений одновременно.
параметр | ценность | инструкция |
---|---|---|
queue | - | связанная очередь |
exchange | - | Связанный обмен |
routing-key | - | Правила маршрутизации сообщений с подпиской |
no-wait | false | Этот метод требует подтверждения ответа |
Java-код
// 绑定监听队列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName + "@failed", "master.failed", routingKey);
channel.queueBind(queueName + "@retry", "master.retry", routingKey);
PHP-код
$this->channel->queue_bind($queueName, 'master', $routingKey);
$this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey);
$this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);
В интерфейсе управления RabbitMQ мы можем видеть отношения привязки между очередью и Exchange и ключом маршрутизации.
Реализация потребления сообщений
использоватьbasic_consume
При потреблении сообщений необходимо обращать внимание на следующие параметры
параметр | ценность | инструкция |
---|---|---|
queue | - | Имя очереди на потребление |
consumer-tag | - | Идентификатор потребителя, оставьте поле пустым |
no_local | false | Если это поле установлено, сервер не будет публиковать сообщение клиенту, который его опубликовал. |
no_ack | false | Требуется ответ подтверждения потребления |
exclusive | false | Эксклюзивный доступ, после установки только текущему потребителю разрешен доступ к очереди |
nowait | false | Этот метод требует подтверждения ответа |
При использовании сообщения потребитель должен получить количество раз, когда сообщение было использовано из сообщения, чтобы определить, не удается ли выполнить повторную попытку обработки сообщения или отправить его в очередь сбоев.
Java-код
protected Long getRetryCount(AMQP.BasicProperties properties) {
Long retryCount = 0L;
try {
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (headers.containsKey("x-death")) {
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths.size() > 0) {
Map<String, Object> death = deaths.get(0);
retryCount = (Long) death.get("count");
}
}
}
} catch (Exception e) {}
return retryCount;
}
PHP-код
protected function getRetryCount(AMQPMessage $msg): int
{
$retry = 0;
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers')->getNativeData();
if (isset($headers['x-death'][0]['count'])) {
$retry = $headers['x-death'][0]['count'];
}
}
return (int)$retry;
}
После того, как потребление сообщения завершено, необходимо отправить сообщение подтверждения потребления на сервер, используяbasic_ack
метод
ack(delivery-tag=消息的delivery-tag标识)
Java-код
// 消息消费处理
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
...
// 注意,由于使用了basicConsume的autoAck特性,因此这里就不需要手动执行
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 执行消息消费处理
channel.basicConsume(
queueName,
true, // autoAck
consumer
);
PHP-код
$this->channel->basic_consume(
$queueName,
'', // customer_tag
false, // no_local
false, // no_ack
false, // exclusive
false, // nowait
function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
...
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
Если при обработке сообщения возникает исключение, сообщение должно быть повторно доставлено на повторную биржу и дождаться следующей повторной попытки.
basic_publish(msg, 'master.retry', routing-key)
ack(delivery-tag) // 不要忘记了应答消费成功消息
Если установлено, что число повторных попыток превышает 3, а обработка по-прежнему завершается неудачно, сообщение следует доставить на отказавший Exchange и дождаться ручной обработки.
basic_publish(msg, 'master.failed', routing-key)
ack(delivery-tag) // 不要忘记了应答消费成功消息
Убедитесь, что вы не забыли сообщение подтверждения, потому что повторная попытка и сбой достигаются за счет повторной доставки сообщения на обмен повторных попыток и сбоев.Если вы забудете подтверждение, сообщение будет повторно доставлено потребителю после тайм-аута или соединения. отключен, иначе, если потребитель все еще не может с этим справиться, это вызовет бесконечный цикл.
Java-код
try {
String message = new String(body, "UTF-8");
// 消息处理函数
handler.handle(message, envelope.getRoutingKey());
} catch (Exception e) {
long retryCount = getRetryCount(properties);
if (retryCount > 3) {
// 重试次数大于3次,则自动加入到失败队列
channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body);
} else {
// 重试次数小于3,则加入到重试队列,30s后再重试
channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body);
}
}
Повторить неудачные задачи
Если задача повторяется три раза и все равно не удается, она будет доставлена в очередь сбоев.В это время вам необходимо вручную обработать исключение программы.После обработки сообщение необходимо повторно доставить в очередь для обработки. Единственное, что вам нужно сделать здесь, это подписаться на сообщение из очереди отказов, затем после получения сообщения очистить егоapplication_headers
информацию заголовка, а затем повторно доставить ее вmaster
Этот обмен будет делать.
Java-код
channel.basicPublish(
'master',
envelope.getRoutingKey(),
MessageProperties.PERSISTENT_BASIC,
body
);
PHP-код
$msg->set('application_headers', new AMQPTable([]));
$this->channel->basic_publish(
$msg,
'master',
$msg->get('routing_key')
);
как пользоваться
Мы закончили говорить об отношениях между очередью и Exchange, публикацией и подпиской, так каков же эффект от их использования? Здесь мы берем код Java в качестве примера
// 发布消息
Publisher publisher = new Publisher(factory.newConnection(), 'master');
publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create");
// 订阅消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
.init("user-monitor", "user.*")
.subscribe((message, routingKey) -> {
// TODO 业务逻辑
System.out.printf(" <%s> message consumed: %s\n", routingKey, message);
}
);
Суммировать
При использовании RabbitMQ способ реализации отложенных повторных попыток и очереди отказов не ограничивается методами, описанными в этой статье.Если у читателей есть лучшее решение для реализации, добро пожаловать в кирпич.Здесь я просто бросаю кирпич. Метод, описанный в этой статье, еще имеет много возможностей для оптимизации, и читатели также могут попробовать улучшить его реализацию.Например, в этой статье используются три Exchagnes.Может ли только одна Exchange реализовать функции, описанные в этой статье.
Эта статья будет пересматриваться и обновляться.GITHUBВверхПлан развития программистаProject, добро пожаловать в Star, больше интересного контента, пожалуйстаfollow me.