Руководство по началу работы с RabbitMQ

задняя часть Docker JavaScript RabbitMQ

Резюме:Использование очереди сообщений RabbitMQ может эффективно улучшить пиковую производительность системы.

Введение в RabbitMQ

RabbitMQдаБрокер сообщений, который поддерживает различные методы асинхронной обработки сообщений, наиболее распространенными из которых являются:

  • Work Queue: Кэшировать сообщения в очередь.По умолчанию несколько рабочих процессов обрабатывают сообщения в очереди по принципу циклического перебора. Каждое сообщение будет назначено только одному работнику.
  • Publish/Subscribe: каждый потребитель, подписавшийся на сообщение, получит сообщение, поэтому каждое сообщение обычно назначается нескольким рабочим процессам, и каждый рабочий процесс обрабатывает сообщение по-своему.

RabbitMQ также поддерживаетRouting,Topics,а такжеRemote procedure calls (RPC)и так далее.

Для разных методов обработки сообщений одно и то же, RabbitMQ — это промежуточный узел между производителями и потребителями сообщений, отвечающий за кэширование и распространение сообщений. RabbitMQ получает сообщения от производителей, кэширует их в памяти и по-разному распределяет среди потребителей. RabbitMQ также может записывать сообщения на диск, чтобы гарантировать сохранение, так что даже в случае неожиданного сбоя RabbitMQ данные сообщения не будут полностью потеряны.

Зачем использовать RabbitMQ?

Самый простой момент заключается в том, что он поддерживает различные методы обработки сообщений, такие как рабочая очередь, и может использоваться в различных бизнес-сценариях. о насFundebugНапример, в настоящее время используется только очередь сообщений RabbitMQ Work Queue.

С помощью очередей сообщений вычислительные задачи, которые не являются срочными, но очень ресурсоемкими, могут быть вставлены в очереди RabbitMQ в виде сообщений, а затем для обработки этих сообщений используется несколько модулей обработки.

Самым большим преимуществом этого является:Улучшенная пропускная способность системы при пиковых нагрузках. Потому что сообщения, которые слишком поздно обработать, кэшируются в RabbitMQ, что предотвращает сбой системы из-за перегрузки из-за большого количества вычислений одновременно. А те сообщения, которые слишком поздно обрабатываются, будут медленно обрабатываться после того, как пик будет пройден.

Еще одно преимущество заключается в том, чторазъединение. Производителю сообщения нужно только отправить сообщение в RabbitMQ.Когда эти сообщения будут обработаны, это не повлияет на производительность ответа производителя.

Реклама: Добро пожаловать в бесплатную пробную версиюFundebug, чтобы вы могли отслеживать ОШИБКИ онлайн-кода и улучшать взаимодействие с пользователем ~

Установите и запустите RabbitMQ.

использоватьDockerЗапустить RabbitMQ так же просто, как выполнить простую команду:

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7

Для друзей, которые не знакомы с Docker, позвольте мне объяснить параметры команды docker:

  • -d: запустить контейнер в фоновом режиме
  • --name rabbitmq: установить имя контейнера rabbitmq
  • -h rabbitmq: установите имя хоста контейнера rabbitmq.Если вы хотите, чтобы данные сообщения RabbitMQ сохранялись на локальном диске, вам нужно установить имя хоста, потому что каталог, в котором RabbitMQ сохраняет данные, является именем хоста.
  • -p 5672:5672: сопоставьте порт 5672 контейнера с портом 5672 локального хоста, чтобы к rabbitmq можно было получить доступ через локальный порт 5672.
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq: Сопоставьте каталог /var/lib/rabbitmq контейнера с каталогом /var/lib/rabbitmq локального хоста, чтобы данные сообщения RabbitMQ можно было сохранить на локальном диске.Даже если контейнер RabbitMQ удален, данные еще есть.

Docker предоставляет официальные образыУскоренный сервис, поэтому образ Rabbit Docker в команде называетсяregistry.docker-cn.com/library/rabbitmq:3.7.

Если вы не знаете Docker, я предлагаю вам изучить его. Если вы не хотите учиться, команда для установки RabbitMQ под Ubuntu 14.04 выглядит следующим образом:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

Запустите RabbitMQ:

sudo service rabbitmq-server start

Пример кода очереди сообщений

Ниже мы используемNode.jsРеализуйте простую очередь сообщений.

Производитель сообщения:sender.js

const amqp = require("amqplib");

const queue = "demo";

async function sendMessage(message)
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.sendToQueue(queue, new Buffer(message),
    {
        // RabbitMQ关闭时,消息会被保存到磁盘
        persistent: true
    });
}


setInterval(function()
{
    sendMessage("Hello, Fundebug!");
}, 1000)
  • В отправителе продолжайте отправлять «Hello, Fundebug!» в очередь сообщений.

Потребитель сообщения:receiver.js

const amqp = require("amqplib");

const queue = "demo";

async function receiveMessage()
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.consume(queue, function(message)
    {
        console.log(message.content.toString());
        channel.ack(message);
    });
}

receiveMessage();
  • В приемнике прочитайте сообщение из очереди сообщений и распечатайте его.

мы использовалиamqplibМодуль, используемый для связи с RabbitMQ, для получения подробной информации о конкретном интерфейсе вы можете просмотретьДокументация.

вызовsendToQueue, установите для постоянного свойства значение true, чтобы при закрытии RabbitMQ сообщение сохранялось на диск. Проверить это просто:

  • закрыть приемник
  • Запустите отправителя и отправьте сообщение RabbitMQ.
  • Перезапустите RabbitMQ (sudo docker перезапустит rabbitmq)
  • Запустите приемник, и вы обнаружите, что он может получать сообщения, отправленные отправителем, до перезапуска RabbitMQ.

Поскольку контейнер RabbitMQ сохраняет каталог данных (/var/lib/rabbitmq) как том данных на локальном хосте, даже если контейнер RabbitMQ удаляется (sudo docker rm -f rabbitmq) и перезапускается, эффект тот же. .

Кроме того, этот код использует новейший метод написания асинхронного кода Node.js:Async/Await, поэтому он очень лаконичен, и заинтересованные студенты могут его понять.

Принцип работы этой демонстрации очень прост:

  • Запуск контейнера RabbitMQ
sudo ./start_rabbitmq.sh
  • Отправить сообщение
node ./sender.js
  • получить сообщение
node ./receiver.js

На стороне получателя вы можете видеть, что постоянно печатается «Hello, Fundebug!».

Адрес репозитория кода:Fundebug/rabbitmq-demo

Пример кода автоматического переподключения

В производственной среде RabbitMQ неизбежно перезапустится, например, при замене дисков или серверов, и выйдет из строя из-за чрезмерной нагрузки. Поскольку RabbitMQ может записывать сообщения на диск, данные «безопасны». Однако в коде должен быть реализован механизм автоматического переподключения, иначе приложение Node.js рухнет при остановке RabbitMQ. Вот пример кода автоматического переподключения для справки:

Производитель сообщения:sender_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 连接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),
        {
            // RabbitMQ重启时,消息会被保存到磁盘
            persistent: true
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

Потребитель сообщения:receiver_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 连接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

Таким образом, даже если RabbitMQ перезапустится, отправитель и получатель смогут автоматически переподключиться к RabbitMQ. Если вы хотите отслеживать ошибки RabbitMQ, вы можете использовать нашFundebugjs, когда соединение вызывает событие «ошибка» или «закрытие», сигнал тревоги отправляется как можно скорее, чтобы разработчики могли вовремя обнаруживать ошибки и устранять их.

Ссылаться на

О Фундебаге

FundebugСосредоточьтесь на JavaScript, апплете WeChat, мини-игре WeChat, апплете Alipay, React Native, Node.js и мониторинге ошибок онлайн-приложений Java в режиме реального времени. С момента официального запуска Double Eleven в 2016 году Fundebug обработал в общей сложности более 1 миллиарда ошибок, а платными клиентами являются Google, 360, Kingsoft, People.com и многие другие бренды. приветствую всехБесплатная пробная версия!

Уведомление об авторских правах

Пожалуйста, указывайте автора при перепечаткеFundebugИ адрес этой статьи:
ошибка blog.fun.com/2018/04/20/…