Резюме:Использование очереди сообщений RabbitMQ может эффективно улучшить пиковую производительность системы.
Введение в RabbitMQ
RabbitMQдаБрокер сообщений, который поддерживает различные методы асинхронной обработки сообщений, наиболее распространенными из которых являются:
- Work Queue: Кэшировать сообщения в очередь.По умолчанию несколько рабочих процессов обрабатывают сообщения в очереди по принципу циклического перебора. Каждое сообщение будет назначено только одному работнику.
- Publish/Subscribe: каждый потребитель, подписавшийся на сообщение, получит сообщение, поэтому каждое сообщение обычно назначается нескольким рабочим процессам, и каждый рабочий процесс обрабатывает сообщение по-своему.
RabbitMQ также поддерживаетRouting,Topics,а такжеRemote procedure calls (RPC)и так далее.
Для разных методов обработки сообщений одно и то же, RabbitMQ — это промежуточный узел между производителями и потребителями сообщений, отвечающий за кэширование и распространение сообщений. RabbitMQ получает сообщения от производителей, кэширует их в памяти и по-разному распределяет среди потребителей. RabbitMQ также может записывать сообщения на диск, чтобы гарантировать сохранение, так что даже в случае неожиданного сбоя RabbitMQ данные сообщения не будут полностью потеряны.
Зачем использовать RabbitMQ?
Самый простой момент заключается в том, что он поддерживает различные методы обработки сообщений, такие как рабочая очередь, и может использоваться в различных бизнес-сценариях. о насFundebugНапример, в настоящее время используется только очередь сообщений RabbitMQ Work Queue.
С помощью очередей сообщений вычислительные задачи, которые не являются срочными, но очень ресурсоемкими, могут быть вставлены в очереди RabbitMQ в виде сообщений, а затем для обработки этих сообщений используется несколько модулей обработки.
Самым большим преимуществом этого является:Улучшенная пропускная способность системы при пиковых нагрузках. Потому что сообщения, которые слишком поздно обработать, кэшируются в RabbitMQ, что предотвращает сбой системы из-за перегрузки из-за большого количества вычислений одновременно. А те сообщения, которые слишком поздно обрабатываются, будут медленно обрабатываться после того, как пик будет пройден.
Еще одно преимущество заключается в том, чторазъединение. Производителю сообщения нужно только отправить сообщение в RabbitMQ.Когда эти сообщения будут обработаны, это не повлияет на производительность ответа производителя.
Реклама: Добро пожаловать в бесплатную пробную версиюFundebug, чтобы вы могли отслеживать ОШИБКИ онлайн-кода и улучшать взаимодействие с пользователем ~
Установите и запустите RabbitMQ.
использоватьDockerЗапустить RabbitMQ так же просто, как выполнить простую команду:
sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7
Для друзей, которые не знакомы с Docker, позвольте мне объяснить параметры команды docker:
- -d: запустить контейнер в фоновом режиме
- --name rabbitmq: установить имя контейнера rabbitmq
- -h rabbitmq: установите имя хоста контейнера rabbitmq.Если вы хотите, чтобы данные сообщения RabbitMQ сохранялись на локальном диске, вам нужно установить имя хоста, потому что каталог, в котором RabbitMQ сохраняет данные, является именем хоста.
- -p 5672:5672: сопоставьте порт 5672 контейнера с портом 5672 локального хоста, чтобы к rabbitmq можно было получить доступ через локальный порт 5672.
- -v /var/lib/rabbitmq:/var/lib/rabbitmq: Сопоставьте каталог /var/lib/rabbitmq контейнера с каталогом /var/lib/rabbitmq локального хоста, чтобы данные сообщения RabbitMQ можно было сохранить на локальном диске.Даже если контейнер RabbitMQ удален, данные еще есть.
Docker предоставляет официальные образыУскоренный сервис, поэтому образ Rabbit Docker в команде называетсяregistry.docker-cn.com/library/rabbitmq:3.7.
Если вы не знаете Docker, я предлагаю вам изучить его. Если вы не хотите учиться, команда для установки RabbitMQ под Ubuntu 14.04 выглядит следующим образом:
sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
Запустите RabbitMQ:
sudo service rabbitmq-server start
Пример кода очереди сообщений
Ниже мы используемNode.jsРеализуйте простую очередь сообщений.
Производитель сообщения:sender.js
const amqp = require("amqplib");
const queue = "demo";
async function sendMessage(message)
{
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.sendToQueue(queue, new Buffer(message),
{
// RabbitMQ关闭时,消息会被保存到磁盘
persistent: true
});
}
setInterval(function()
{
sendMessage("Hello, Fundebug!");
}, 1000)
- В отправителе продолжайте отправлять «Hello, Fundebug!» в очередь сообщений.
Потребитель сообщения:receiver.js
const amqp = require("amqplib");
const queue = "demo";
async function receiveMessage()
{
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.consume(queue, function(message)
{
console.log(message.content.toString());
channel.ack(message);
});
}
receiveMessage();
- В приемнике прочитайте сообщение из очереди сообщений и распечатайте его.
мы использовалиamqplibМодуль, используемый для связи с RabbitMQ, для получения подробной информации о конкретном интерфейсе вы можете просмотретьДокументация.
вызовsendToQueue, установите для постоянного свойства значение true, чтобы при закрытии RabbitMQ сообщение сохранялось на диск. Проверить это просто:
- закрыть приемник
- Запустите отправителя и отправьте сообщение RabbitMQ.
- Перезапустите RabbitMQ (sudo docker перезапустит rabbitmq)
- Запустите приемник, и вы обнаружите, что он может получать сообщения, отправленные отправителем, до перезапуска RabbitMQ.
Поскольку контейнер RabbitMQ сохраняет каталог данных (/var/lib/rabbitmq) как том данных на локальном хосте, даже если контейнер RabbitMQ удаляется (sudo docker rm -f rabbitmq) и перезапускается, эффект тот же. .
Кроме того, этот код использует новейший метод написания асинхронного кода Node.js:Async/Await, поэтому он очень лаконичен, и заинтересованные студенты могут его понять.
Принцип работы этой демонстрации очень прост:
- Запуск контейнера RabbitMQ
sudo ./start_rabbitmq.sh
- Отправить сообщение
node ./sender.js
- получить сообщение
node ./receiver.js
На стороне получателя вы можете видеть, что постоянно печатается «Hello, Fundebug!».
Адрес репозитория кода:Fundebug/rabbitmq-demo
Пример кода автоматического переподключения
В производственной среде RabbitMQ неизбежно перезапустится, например, при замене дисков или серверов, и выйдет из строя из-за чрезмерной нагрузки. Поскольку RabbitMQ может записывать сообщения на диск, данные «безопасны». Однако в коде должен быть реализован механизм автоматического переподключения, иначе приложение Node.js рухнет при остановке RabbitMQ. Вот пример кода автоматического переподключения для справки:
Производитель сообщения:sender_reconnect.js
const amqp = require("amqplib");
const queue = "demo";
var connection;
// 连接RabbitMQ
async function connectRabbitMQ()
{
try
{
connection = await amqp.connect("amqp://localhost");
console.info("connect to RabbitMQ success");
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),
{
// RabbitMQ重启时,消息会被保存到磁盘
persistent: true
});
connection.on("error", function(err)
{
console.log(err);
setTimeout(connectRabbitMQ, 10000);
});
connection.on("close", function()
{
console.error("connection to RabbitQM closed!");
setTimeout(connectRabbitMQ, 10000);
});
}
catch (err)
{
console.error(err);
setTimeout(connectRabbitMQ, 10000);
}
}
connectRabbitMQ();
Потребитель сообщения:receiver_reconnect.js
const amqp = require("amqplib");
const queue = "demo";
var connection;
// 连接RabbitMQ
async function connectRabbitMQ()
{
try
{
connection = await amqp.connect("amqp://localhost");
console.info("connect to RabbitMQ success");
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.consume(queue, async function(message)
{
console.log(message.content.toString());
channel.ack(message);
});
connection.on("error", function(err)
{
console.log(err);
setTimeout(connectRabbitMQ, 10000);
});
connection.on("close", function()
{
console.error("connection to RabbitQM closed!");
setTimeout(connectRabbitMQ, 10000);
});
}
catch (err)
{
console.error(err);
setTimeout(connectRabbitMQ, 10000);
}
}
connectRabbitMQ();
Таким образом, даже если RabbitMQ перезапустится, отправитель и получатель смогут автоматически переподключиться к RabbitMQ. Если вы хотите отслеживать ошибки RabbitMQ, вы можете использовать нашFundebugjs, когда соединение вызывает событие «ошибка» или «закрытие», сигнал тревоги отправляется как можно скорее, чтобы разработчики могли вовремя обнаруживать ошибки и устранять их.
Ссылаться на
- AMQP library (RabbitMQ) - async/await
- Документация RbbitMQ: рабочая очередь (JavaScript)
- Won't persist data
- How to build reconnect logic for amqplib
О Фундебаге
FundebugСосредоточьтесь на JavaScript, апплете WeChat, мини-игре WeChat, апплете Alipay, React Native, Node.js и мониторинге ошибок онлайн-приложений Java в режиме реального времени. С момента официального запуска Double Eleven в 2016 году Fundebug обработал в общей сложности более 1 миллиарда ошибок, а платными клиентами являются Google, 360, Kingsoft, People.com и многие другие бренды. приветствую всехБесплатная пробная версия!
Уведомление об авторских правах
Пожалуйста, указывайте автора при перепечаткеFundebugИ адрес этой статьи:
ошибка blog.fun.com/2018/04/20/…