Используйте очередь недоставленных сообщений RabbitMQ для запланированных задач

Node.js Redis Linux RabbitMQ

Выполнение временных задач в процессе разработки является очень распространенным бизнес-сценарием.На уровне кода Node.js может использовать базовый синтаксис setTimeout, setInerval или использоватьnode-scheduleЭти похожие библиотеки могут частично решить эту задачу.В сторонних сервисах вы можете использовать Redis Keyspace Notification или собственный crontab Linux для выполнения запланированных задач. В качестве промежуточного программного обеспечения для сообщений RabbitMQ также может использовать свою очередь недоставленных сообщений для выполнения задач по времени.

В этой статье в качестве демонстрационного языка используется Node.js, а RabbitMQ работает с использованиемamqplib.

очередь недоставленных сообщений

В RabbitMQ есть биржа под названием DLX, полное название Dead-Letter-Exchange, которую можно назвать обменом мертвых писем. Когда сообщение становится мертвым письмом в очереди, оно будет повторно отправлено на другой обмен, которым является DLX, а очередь, связанная с DLX, называется очередью недоставленных сообщений. Сообщение становится мертвой буквой, как правило, в следующих ситуациях:

  • Сообщение отклонено, а для параметра requeue установлено значение false
  • срок действия сообщения истек
  • очередь достигает максимальной длины

DLX тоже обычный обмен, который ничем не отличается от общего обмена, его можно указать на любой очереди, по сути это установка свойств определенной очереди. Когда в этой очереди есть недоставленное письмо, RabbitMQ автоматически повторно опубликует сообщение в заданном DLX, а затем перенаправит его в другую очередь, очередь недоставленных сообщений. Чтобы добавить DLX в очередь, вам нужно установить егоdeadLetterExchangeа такжеdeadLetterRoutingKeyпараметр,deadLetterRoutingKeyНеобязательный параметр, указывающий ключ маршрутизации, указанный для DLX. Если он не указан, будет использоваться ключ маршрутизации исходной очереди.

const amqp = require('amqplib');

const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
  .then((conn) => {
    connection = conn;
    return conn.createChannel();
  })
  .then((ch) => {
    channel = ch;
    ch.assertExchange(myNormalEx, 'direct', { durable: false });
    return ch.assertQueue(myNormalQueue, {
      exclusive: false,
      deadLetterExchange: myDeadLetterEx,
      deadLetterRoutingKey: myDeadLetterRoutingKey,
    });
  })
  .then((ok) => {
    channel.bindQueue(ok.queue, myNormalEx);
    channel.sendToQueue(ok.queue, Buffer.from('hello'));
    setTimeout(function () { connection.close(); process.exit(0) }, 500);
  })
  .catch(console.error);

Приведенный выше код сначала объявляет обменmyNormalEx, затем объявляет очередьmyNormalQueue, установив егоdeadLetterExchangeпараметр, для которого был добавлен DLX. Итак, когда очередьmyNormalQueueКогда есть новости, которые становятся мертвой буквой, они будут опубликованы наmyDeadLetterExвходить.

Срок действия (TTL)

В RabbitMQ вы можете установить время истечения срока действия для сообщений и очередей. Когда время истечения задается в свойствах очереди, все сообщения в очереди имеют одинаковое время истечения. При установке отдельных сроков действия для сообщений TTL может быть разным для каждого сообщения. Если оба метода используются вместе, TTL сообщения будет меньшим из двух. Как только время жизни сообщения в очереди превысит установленное значение TTL, оно станет «мертвым сообщением», и потребители больше не смогут получать сообщение.

Установка TTL для каждого сообщения устанавливается при отправке сообщенияexpirationпараметр в миллисекундах.

const amqp = require('amqplib');

const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
  .then((conn) => {
    connection = conn;
    return conn.createChannel();
  })
  .then((ch) => {
    channel = ch;
    ch.assertExchange(myNormalEx, 'direct', { durable: false });
    return ch.assertQueue(myNormalQueue, {
      exclusive: false,
      deadLetterExchange: myDeadLetterEx,
      deadLetterRoutingKey: myDeadLetterRoutingKey,
    });
})
  .then((ok) => {
    channel.bindQueue(ok.queue, myNormalEx);
    channel.sendToQueue(ok.queue, Buffer.from('hello'), { expiration: '4000'});
    setTimeout(function () { connection.close(); process.exit(0) }, 500);
  })
  .catch(console.error);

Когда приведенный выше код отправляет сообщение в очередь, он передает{ expiration: '4000'}Время истечения этого сообщения установлено на 4 секунды, а срок действия сообщения истекает через 4 секунды. Это сообщение не обязательно будет отброшено или введено в мертвое письмо через 4 секунды. Только когда оно будет использовано, будет оцениваться, является ли срок его действия истек. Если срок его действия еще не истек, он будет использован потребителями. Если срок его действия истек, он будет удален или станет мертвой буквой.

задача на время

Потому что сообщение в очереди станет мертвым письмом по истечении срока его действия, и мертвое письмо будет опубликовано в DLX очереди, в которой находится сообщение.Таким образом, задав время истечения срока действия для сообщения, а затем используя очередь, связанную с DLX очереди, в которой находится сообщение, достигается цель определения времени задачи.Проще говоря, когда есть очередь queue1, ее DLX — deadEx1, а deadEx1 привязан к очереди deadQueue1.Когда сообщение в очереди 1 становится недоставленным письмом из-за истечения срока действия, оно будет опубликовано в deadEx1 через очередь потребления deadQueue1 Сообщение в очереди1 эквивалентно использованию недоставленного сообщения из-за истечения срока действия в очереди1.

Код для использования очереди недоставленных сообщений выглядит следующим образом:

const amqp = require('amqplib');

const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterQueue = 'my_dead_letter_queue';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let channel;
amqp.connect('amqp://localhost')
.then((conn) => {
  return conn.createChannel();
})
.then((ch) => {
  channel = ch;
  ch.assertExchange(myDeadLetterEx, 'direct', { durable: false });
  return ch.assertQueue(myDeadLetterQueue, { exclusive: false });
})
.then((ok) => {
  channel.bindQueue(ok.queue, myDeadLetterEx, myDeadLetterRoutingKey);
  channel.consume(ok.queue, (msg) => {
    console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
  }, { noAck: true})
})
.catch(console.error);

Здесь следует отметить, что если объявленныйmyDeadLetterExявляется прямым типом, то необходимо указать BindingKey при привязке очереди, то есть здесьmyDeadLetterRoutingKey, если не указать Bindingkey, нужноmyDeadLetterExОбъявлен как тип разветвления.