Практика RabbitMQ

RabbitMQ
Практика RabbitMQ

I. Обзор

1.1 Предыстория

В последнее время я работаю над платформой интеграции сигналов тревоги, в которой необходимо отправлять сообщения о сигналах тревоги, а типы должны охватывать основные получатели сообщений на текущем рынке, такие как WeChat/Enterprise WeChat/DingTalk/Email/SMS/Telephone, и т. д., который должен использовать MQ, во многих Среди промежуточного программного обеспечения сообщений, после исследования, этот сценарий не требует kafka, как сценарий обработки больших данных, но также требует высокой производительности и механизма подтверждения, надежности данных и активного сообщества, а также поддерживает сохранение сообщений в высокодоступном развертывании промежуточного программного обеспечения и, наконец, выбрал RabbitMQ в качестве промежуточного программного обеспечения приложения.

1.2 Концепция

MQ называется Message Queue, то есть очередь сообщений. MQ — это метод связи между приложениями. Приложения обмениваются данными, читая и записывая сообщения в очередь и из очереди (данные для приложения), не требуя выделенного соединения для их связывания. Обмен сообщениями относится к связи между программами путем отправки данных в сообщениях, а не путем прямого вызова друг друга, что обычно используется для таких методов, как удаленные вызовы процедур. Очередь относится к приложениям, взаимодействующим через очереди. Использование очередей устраняет требование одновременного выполнения принимающих и отправляющих приложений. RabbitMQ — это полная корпоративная система обмена сообщениями многократного использования, основанная на AMQP.

1.3 Функция

  • Разделение приложений: уровень интерфейса на основе данных mq разлагает связанное приложение и реализует этот интерфейс на обеих сторонах, что позволяет независимо модифицировать или расширять процесс обработки на обеих сторонах, если обе стороны соответствуют одним и тем же ограничениям интерфейса.
  • Пик потока: в условиях высокой параллелизма и большого трафика Rabbitmq может снизить нагрузку на экстренный доступ и не будет аварийно завершать работу из-за внезапных требований к нагрузке по тайм-ауту.
  • Асинхронная связь: путем отправки сообщения промежуточному программному обеспечению сообщений асинхронная обработка бизнес-процессов не в реальном времени

1.4 Особенности

  • Надежность: rabritmq использует некоторые механизмы для обеспечения надежности, такие как настойчивость, подтверждение передачи и подтверждение выпуска.
  • Гибкая маршрутизация: маршрутизируйте сообщения через обмены до того, как они попадут в очередь. Для типичных функций маршрутизации RabbitMQ уже предоставляет некоторые встроенные переключатели для реализации. Для более сложных функций маршрутизации вы можете связать несколько бирж вместе или реализовать собственную биржу с помощью механизма подключаемых модулей.
  • Масштабируемость: несколько узлов RabbitMQ могут образовывать кластер, а узлы в кластере можно динамически расширять в соответствии с реальными условиями бизнеса.
  • Высокая доступность: Очереди можно зеркально отразить на компьютерах в кластере, чтобы очереди оставались доступными в случае проблем с некоторыми узлами.
  • Несколько протоколов: RabbitMQ не только изначально поддерживает протокол AMQP, но также поддерживает STOMP, MQTT и другие протоколы промежуточного программного обеспечения сообщений.
  • Многоязычный клиент: RabbitMQ поддерживает почти все распространенные языки, такие как Java, Python, Ruby, PHP, C#, JavaScript и т. д.
  • Интерфейс управления: RabbitMQ предоставляет простой в использовании пользовательский интерфейс, который позволяет пользователям отслеживать и управлять сообщениями, узлами в кластере и многим другим.
  • Механизм подключаемых модулей: RabbitMQ предоставляет множество подключаемых модулей для расширения различными способами, конечно, вы также можете написать свои собственные подключаемые модули.

Две архитектуры

2.1 Схема архитектуры

图片描述
图片描述
图片描述

  • RabbitMQ Server

Также называется Broker Server, это не грузовик, который доставляет еду, это служба доставки. Изначально RabbitMQ — это не фургон с едой, а служба доставки, ее роль — поддерживать маршрут от производителя к потребителю, чтобы гарантировать, что данные могут быть переданы указанным способом. Хотя эта гарантия не является стопроцентной гарантией, ее достаточно для обычных приложений. Конечно, для коммерческих систем вы можете сделать еще один уровень защиты согласованности данных, чтобы полностью обеспечить согласованность системы.

  • Client P

Также называется Producer, отправителем данных. Создавайте сообщения и публикуйте (отправляйте) их на брокерский сервер (RabbitMQ). Сообщение состоит из двух частей: полезной нагрузки (payload) и метки (label). Полезная нагрузка, как следует из названия, представляет собой передаваемые данные. Метка — это имя обмена или тег, который описывает полезную нагрузку, и RabbitMQ также использует эту метку, чтобы решить, какому потребителю отправить сообщение. AMQP описывает только метку, а RabbitMQ определяет правила использования метки.

  • Client C

Также называется Потребителем, получателем данных. Потребители подключаются к брокерскому серверу (RabbitMQ) и подписываются на очередь. Думайте об очереди как о почтовом ящике с именем. Когда сообщение поступает в почтовый ящик, RabbitMQ отправляет его одному из своих подписчиков, Потребителю. Конечно, одно и то же Сообщение может быть отправлено многим Потребителям. В этом сообщении удалена только полезная нагрузка, метка. Для Потребителя неизвестно, кто отправил эту информацию, то есть сам протокол ее не поддерживает. Другое дело, если полезная нагрузка, отправленная Продюсером, содержит информацию Продюсера.

  • Connection

Это TCP-соединение. И производитель, и потребитель подключены к серверу RabbitMQ через TCP. Позже мы можем увидеть, что начало программы заключается в установлении этого TCP-соединения.

  • Channel

виртуальное соединение. Он устанавливается в вышеупомянутом TCP-соединении. Поток данных осуществляется в канале. То есть общая ситуация такова, что программа начинает устанавливать TCP-соединение, и вторым шагом является установление этого Канала.

Итак, зачем использовать Channel вместо прямого TCP-соединения?

Для ОС установление и закрытие TCP-соединений обходится дорого.Частое установление и закрытие TCP-соединений сильно влияет на производительность системы, а количество TCP-соединений также ограничено, что также ограничивает способность системы справляться с высокой степенью параллелизма. . Однако установка канала в TCP-соединении не требует таких затрат. Для производителя или потребителя несколько каналов могут использоваться одновременно для публикации или получения. Эксперименты показали, что данные 1s могут публиковать 10K пакетов данных. Конечно, для разных аппаратных сред данные разных размеров пакетов однозначно разные, но я просто хочу пояснить, что для обычного Consumer или Producer этого достаточно. Если этого недостаточно, вам следует подумать о том, как улучшить свой дизайн с помощью SPLIT.

2.2 Связанные определения

  • Exchange: Переключатель сообщений, указывающий, какие сообщения в соответствии с правилами направляются в очередь.
  • Очередь: носитель очереди сообщений, каждое сообщение будет помещено в одну или несколько очередей.
  • Binding: Binding, его функция состоит в том, чтобы связать обмен и очередь в соответствии с правилами маршрутизации.
  • Ключ маршрутизации: ключевое слово маршрутизации, обмен доставляет сообщения в соответствии с этим ключевым словом
  • VHost: виртуальный хост, несколько виртуальных хостов могут быть открыты в одном брокере для разделения разрешений для разных пользователей.
  • Производитель: Производитель сообщения — это программа, которая доставляет сообщение.
  • Потребитель: Потребитель сообщений — это программа, которая принимает сообщения.
  • Канал: канал сообщения, в каждом подключении клиента можно установить несколько каналов, каждый канал представляет собой задачу сеанса

Только Exchange, Queue и RoutingKey могут определить уникальную линию от Exchange до Queue.

2.3 Основные понятия

Фабрика соединений, соединение и канал — это самые основные объекты в API, предоставляемом RabbitMQ. Connection — это ссылка на сокет RabbitMQ, которая инкапсулирует некоторую логику, связанную с протоколом сокета. Connection Factory — это производственная фабрика Connection.

Канал — это самый важный интерфейс, с которым мы имеем дело с RabbitMQ.Большая часть наших бизнес-операций выполняется в интерфейсе канала, включая определение очереди, определение Exchange, привязку очереди и Exchange, публикацию сообщений и т. д.

  • Queue

Queue (очередь) — это внутренний объект RabbitMQ для хранения сообщений, как показано на следующем рисунке.

图片描述

The messages in RabbitMQ can only be stored in the Queue. The producer (P in the figure below) produces the message and finally delivers it to the Queue. The consumer (C in the figure below) can get the message from the Queue and consume Это.

图片描述
Несколько потребителей могут подписаться на одну и ту же очередь.В это время сообщения в очереди будут равномерно распределяться между несколькими потребителями для обработки, вместо того, чтобы каждый потребитель получал все сообщения и обрабатывал их.
图片描述

  • Message acknowledgment

В практических приложениях потребители могут получать сообщения в Queue, но отсутствие обработки приводит к простою (или другим авариям), которые могут вызывать сообщения. Чтобы избежать этого, мы можем попросить потребителей отправить обратно Rabbitmq после потребления, Rabbitmq получает квитанцию ​​​​сообщения (подтверждение сообщения) Удалить сообщение из очереди из очереди.

Если RabbitMQ не получает квитанцию ​​и обнаруживает, что соединение RabbitMQ потребителя отключено, RabbitMQ отправит сообщение другим потребителям (если есть несколько потребителей) для обработки. Здесь нет тайм-аута, и потребитель, обрабатывающий сообщение в течение длительного времени, не приведет к отправке сообщения другим потребителям, если его соединение RabbitMQ не будет отключено.

Здесь возникнет еще одна проблема: если наши разработчики забудут отправить квитанцию ​​в RabbitMQ после обработки бизнес-логики, это приведет к серьезным багам — в Очереди будет накапливаться все больше и больше сообщений. После перезапуска потребителя он будет многократно использовать эти сообщения и многократно выполнять бизнес-логику.

Кроме того, сообщение публикации не имеет ACK.

  • Message durability

Если мы хотим потерять сообщение, даже если служба Rabbitmq будет перезапущена, мы можем установить для Queue и Message значение «устойчивое» (Durable), что гарантирует, что наше сообщение Rabbitmq не будет потеряно. Тем не менее, он по-прежнему не может разрешить возникновение событий с небольшой вероятностью потери (например, сервер Rabbitmq получил сообщение производителя, но сервер Rabbitmq выключен, когда у вас еще есть время для сохранения, если нам нужно это событие с небольшой вероятностью Чтобы управлять, мы должны использовать транзакции. Поскольку это лишь краткое введение в Rabbitmq, в нем не будут объясняться транзакции, связанные с RabbitMQ.

  • Prefetch count

Ранее мы упоминали, что если несколько потребителей одновременно подписываются на сообщения в одной и той же очереди, сообщения в очереди будут совместно использоваться несколькими потребителями. В это время, если время обработки каждого сообщения отличается, это может привести к тому, что некоторые потребители будут все время заняты, в то время как другие потребители быстро завершат работу и останутся без дела. Мы можем ограничить количество сообщений, которые Queue отправляет каждому потребителю каждый раз, установив счетчик Prefetch. Например, если мы установим prefetchCount=1, Queue отправляет каждому потребителю сообщение за раз; после того, как потребитель обработает сообщение, Очередь Еще одно сообщение будет отправлено потребителю.

图片描述

  • Exchange

В предыдущем разделе мы видели, что производитель отправляет сообщения в очередь, чего на самом деле никогда не происходит в RabbitMQ. Фактическая ситуация такова, что производитель отправляет сообщение на Exchange (обмен, X на рисунке ниже), а Exchange направляет сообщение в одну или несколько очередей (или отбрасывает).

По какой логике Exchange направляет сообщения в Queue? Это будет рассмотрено в разделе «Привязка».

В RabbitMQ есть четыре типа Exchange, и разные типы имеют разные стратегии маршрутизации, которые будут представлены в разделе «Типы Exchange».

  • Routing Key

Когда производитель отправляет сообщение в Exchange, он обычно указывает ключ маршрутизации, чтобы указать правила маршрутизации для сообщения, и этот ключ маршрутизации необходимо использовать в сочетании с типом обмена и ключом привязки, чтобы наконец вступили в силу.

Когда тип обмена и ключ привязки фиксированы (при обычном использовании это содержимое обычно фиксировано), наш производитель может определить, куда передается сообщение, указав ключ маршрутизации при отправке сообщения на биржу.

Ограничение длины, установленное RabbitMQ для ключа маршрутизации, составляет 255 байт.

  • Binding

RabbitMQ связан с очередью Exchange с помощью привязки, поэтому RabbitMQ будет знать, как правильно направить сообщение в указанную очередь вверх.

图片描述

  • Binding key

При привязке Exchange и Queue обычно указывается ключ привязки. Когда потребители отправляют сообщения в Exchange, они обычно указывают ключ маршрутизации. Когда ключ привязки совпадает с ключом маршрутизации, сообщение будет перенаправлено в соответствующую очередь. Это будет проиллюстрировано практическими примерами в главе «Типы обмена».

При привязке нескольких очередей к одному и тому же Exchange эти привязки позволяют использовать один и тот же ключ привязки.

Ключ Binding действует не во всех случаях. Это зависит от типа Exchange. Например, тип разветвления Exchange будет игнорировать ключ Binding и направлять сообщения во все очереди, привязанные к Exchange.

  • Exchange Types

Обычно в RabbitMQ используются следующие типы обмена: разветвление, прямой, раздел и заголовки (есть также два типа обмена, упомянутые в спецификации AMQP: системный и пользовательский, которые здесь описываться не будут).

  • fanout

Правило маршрутизации биржи типа fanout очень простое, оно будет маршрутизировать все сообщения, отправляемые на биржу, во все привязанные к ней Очереди.

图片描述

Рисунок выше, производитель (P) отправляется на обмен (x) все сообщения, направляемые в очередь две цифры, и, наконец, два потребителя (C1 и C2).

  • direct

图片描述

Правило маршрутизации Exchange прямого типа также очень простое, оно направит сообщение в те очереди, чей ключ привязки точно соответствует ключу маршрутизации.

图片描述

Конфигурация на приведенном выше рисунке взята в качестве примера.Когда мы отправляем сообщение в Exchange с routingKey="error", сообщение будет перенаправлено в Queue1 (amqp.gen-S9b..., что является именем очереди, автоматически сгенерированным by RabbitMQ) и Queue2 (amqp.gen-Agl...); если мы отправим сообщение с Routing Key="info" или routingKey="warning", сообщение будет перенаправлено только в Queue2. Если мы отправим сообщение с другим ключом маршрутизации, сообщение не будет перенаправлено в эти две очереди.

  • topic

Как упоминалось ранее, правило маршрутизации Exchange прямого типа полностью соответствует ключу привязки и ключу маршрутизации, но этот метод строгого соответствия во многих случаях не может удовлетворить фактические потребности бизнеса. Обмен типа темы расширен в правилах сопоставления, он аналогичен обмену прямого типа, он также направляет сообщения в Очередь, которая соответствует ключу привязки и ключу маршрутизации, но правила сопоставления здесь несколько другие.

Ключ маршрутизации — это строка, разделенная точкой «.» (мы будем называть каждую независимую строку, разделенную точкой «.», словом), например, «stock.usd.nyse», «nyse.vmw», «quick.orange .кролик". Ключ привязки, как и ключ маршрутизации, также представляет собой строку, разделенную точками «.».

Ключ привязки может состоять из двух специальных символов ""с "#" для нечеткого сопоставления, где ""Используется для соответствия слову", # "используется для сопоставления нескольких слов (может быть равно нулю).

图片描述
Конфигурация на приведенном выше рисунке является примером: сообщение RoutingKey = "Quick.Orange.Rabbit" будет направляться в Q1 и Q2, сообщение RoutingKey = "lazy.range.fox" будет направляться в Q1, RoutingKey = "lazy.brown. Сообщение «Fox» будет направлено в Q2, сообщение RoutingKey = «lazy.pink.rabbit» будет направлено в Q2 (будет доставлено только в Q2, хотя этот RoutingKey и Q2 совпадают); routingKey = «Quick.Brown.fox», RoutingKey = "Orange", RoutingKey = "Quick.Orange.male.rabbit" сообщения будут отброшены, так как они не соответствуют ни одному BindingKey.

  • headers

Обмен типами заголовков не зависит от правила сопоставления между ключом маршрутизации и ключом привязки для маршрутизации сообщений, а соответствует атрибуту заголовков в содержимом отправленного сообщения.

Укажите набор пар ключ-значение при привязке Queue и Exchange; когда сообщение отправляется в Exchange, RabbitMQ получит заголовки сообщения (также в виде пары ключ-значение) и сравнит, соответствует ли ключ-значение пара точно соответствует очереди и паре ключ-значение, указанной при привязке Exchange. Если есть точное совпадение, сообщение будет направлено в Очередь, в противном случае оно не будет направлено в Очередь.

Этот тип обмена не использовался (но он должен быть очень полезным), поэтому я не буду его представлять.

  • RPC

Сам MQ основан на асинхронной обработке сообщений.В предыдущем примере все производители (P) не будут знать об успехе или неудаче обработки потребителя (C) после отправки сообщения в RabbitMQ (даже если есть потребитель для обработки сообщения). ).Вообще не знаю).

Но в реальном сценарии приложения нам, вероятно, потребуется некоторая синхронная обработка, и нам нужно дождаться, пока сервер завершит обработку моего сообщения, прежде чем переходить к следующему шагу. Это эквивалентно RPC (удаленный вызов процедур). RPC также поддерживается в RabbitMQ.

图片描述

  • Механизм реализации RPC в RabbitMQ таков:

Когда клиент отправляет запрос (сообщение), он устанавливает два значения в свойствах сообщения (Свойства сообщения, в протоколе AMQP определены 14 свойств, эти свойства будут отправлены вместе с сообщением) answerTo (имя очереди, используется, чтобы сообщить, что после того, как сервер завершит обработку, он отправит сообщение уведомления в эту очередь) и CorrelationId (идентификационный номер этого запроса, этот атрибут должен быть возвращен после того, как сервер завершит обработку, и клиент будет знать, какой запрос был успешно выполнен на основе этого идентификатора или выполнение не удалось). После того, как сервер получит сообщение и обработает его, он сгенерирует ответное сообщение в очередь, указанную в replyTo, с атрибутом корреляции. Клиент ранее подписался на Queue, указанную в replyTo.После получения ответного сообщения от сервера, он анализирует, какой запрос был выполнен по атрибуту корреляцииId, и выполняет последующую бизнес-обработку в соответствии с результатом выполнения.

2.4 Детали

2.4.1 Используйте ACK для подтверждения правильной доставки сообщения

По умолчанию, если Сообщение было правильно получено Потребителем, Сообщение будет удалено из Очереди. Конечно, одно и то же Сообщение может быть отправлено многим Потребителям.

Если очередь не подписана ни одним потребителем, при поступлении данных они будут кэшированы и не будут удалены. Когда есть Потребитель, эти данные будут отправлены этому Потребителю немедленно. Когда данные правильно получены Потребителем, данные удаляются из Очереди.

Итак, каков правильный чек? через акк. Каждое сообщение должно быть подтверждено (acknowledgment, ACK). Мы можем отображать ACK в программе, или мы можем автоматически ACK. Если есть данные, которые не подтверждены ACK, то сервер RabbitMQ отправит эту информацию следующему потребителю.

Если в APP есть ошибка и он забывает ACK, тогда сервер RabbitMQ не будет отправлять ему данные, потому что сервер считает, что потребитель имеет ограниченную вычислительную мощность. Более того, механизм ACK может играть роль в текущем троттлинге (Benefitto throttling): отправка ACK после того, как Потребитель обработает данные, или даже отправка ACK после дополнительной задержки эффективно сбалансирует нагрузку Потребителя.

Конечно, для практических примеров, например, мы можем объединить некоторые данные, такие как данные в течение 4 секунд после объединения, а затем получить данные после 4 секунд сна. Особенно при мониторинге состояния системы мы не хотим, чтобы все состояния передавались в режиме реального времени, а имели определенную задержку. Это уменьшает количество операций ввода-вывода, и конечный пользователь этого не чувствует.

2.4.2 Reject a message

Есть два способа: первый Reject позволяет серверу RabbitMQ отправить сообщение следующему потребителю. Второй — немедленно удалить сообщение из очереди.

2.4.3 Creating a queue

И Consumer, и Procuder могут создавать очереди через queue.declare. Для канала Потребитель не может объявлять очередь, но подписываться на другие очереди. Конечно, вы также можете создавать частные очереди. Таким образом, только само приложение может использовать эту очередь. Очередь также может быть удалена автоматически.Очередь, помеченная как автоудаляемая, будет автоматически удалена после отписки последнего Потребителя. Так что, если вы создадите существующую очередь? Тогда не будет никакого воздействия. Следует отметить, что влияния нет, то есть если параметры второго создания отличаются от первого раза, то хотя операция прошла успешно, атрибуты очереди не будут изменены.

Итак, кто должен нести ответственность за создание этой очереди? Это потребитель или производитель?

Если очередь не существует, конечно, Потребитель не получит никакого сообщения. Тогда сообщение о публикации производителя будет удалено. Поэтому, чтобы не потерять данные, и Consumer, и Producer пытаются создать очередь! В любом случае, несмотря ни на что, этот интерфейс не будет проблемой.

Очередь справляется с балансировкой нагрузки идеально. Для нескольких потребителей RabbitMQ использует циклический метод для сбалансированной отправки различным потребителям.

2.4.4 Exchanges

Как видно из диаграммы архитектуры, Procuder опубликует сообщение в Exchange. Затем «ключи маршрутизации», RABBITMQ найдет, что это сообщение должно поместить в положение, какую очередь. Эта очередь связана с ключами маршрутизации. Существует три типа обменов: прямой, велосипед, тема. Каждый внедрит другой алгоритм маршрутизации (алгоритм маршрутизации).

  • Прямой обмен: ** Если ключ маршрутизации совпадает, то сообщение будет доставлено в соответствующую очередь. На самом деле, когда очередь создается, она автоматически связывает обмен с именем очереди в качестве ключа маршрутизации.
  • Разветвленный обмен:Будет транслироваться в отвечающую очередь.
  • **Обмен темами.** Сопоставление ключей с образцом, например ab, может быть передано во все очереди ab.

2.4.5 Virtual hosts

Каждый виртуальный хост, по сути, является сервером RabbitMQ со своей собственной очередью, обменом, правилами bings и т. д. Это гарантирует, что вы можете использовать RabbitMQ в нескольких разных приложениях.

2.4.6 Почему бы не использовать канал подключения tcp

  • Есть три рукопожатия и четыре волны создания и разрушения TCP, что слишком дорого
  • TCP-ссылки операционной системы ограничены.Если используются TCP-ссылки, тысячи ссылок в секунду в периоды пиковой нагрузки будут напрасной тратой ресурсов.
  • Принцип канала Один процесс имеет один канал, а несколько процессов и несколько каналов совместно используют одну TCP-ссылку.Одна TCP-ссылка может поддерживать неограниченное количество каналов без узких мест в производительности.

Три развертывания

Этот документ установлен и развернут для centos7

3.1 Установить Эрланг

# 配置yum源
cat > /etc/yum.repos.d/erlang.repo << EOF
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOF

3.2 Настройка источника yum

cat > /etc/yum.repos.d/rabbitmq.repo <<EOF
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

3.3 Служба rabbitmq

yum -y install rabbitmq-server
chkconfig rabbitmq-server on


# 更改rabbitmq数据和日志存储目录
# 创建数据和日志目录
mkdir -pv /data/rabbitmq/mnesia
mkdir -pv /data/rabbitmq/log
chown rabbitmq.rabbitmq /data/rabbitmq/* -R

# 创建配置文件
cat >/etc/rabbitmq/rabbitmq-env.conf <<EOF
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/data/rabbitmq/log
EOF


systemctl status rabbitmq-server

# 检查本地cli工具是否认证成功
sudo rabbitmq-diagnostics ping

# 打印应用启用的组件,tcp检讨,内存使用,告警等等。
sudo rabbitmq-diagnostics status

# 打印节点有效的配置
sudo rabbitmq-diagnostics environment

# 本地节点监控检查
sudo rabbitmq-diagnostics node_health_check

# 添加用户
rabbitmqctl add_user xuel xuelpwd
rabbitmqctl list_users
Listing users ...
user	tags
xuel	[xuel]
guest	[administrator]

# 角色定义
none  最小权限角色
management 管理员角色
policymaker   决策者
monitoring  监控
administrator  超级管理员 


[root@VM_0_12_centos ~]# rabbitmqctl set_user_tags xuel administrator
Setting tags for user "xuel" to [administrator] ...
[root@VM_0_12_centos ~]# rabbitmqctl list_users
Listing users ...
user	tags
xuel	[administrator]
guest	[administrator]


#查看所有的队列:
rabbitmqctl list_queues

# 新增虚拟主机:
rabbitmqctl add_vhost  vhost_name
# 将新虚拟主机授权给新用户:
rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'

3.4 Конфигурация

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app

3.5 Плагины

rabbitmq-plugins enable rabbitmq_management

# rabbitmq 为了安全guest用户只能localhost访问,开启guest/guest登陆
cat > /etc/rabbitmq/rabbitmq.config <<EOF
[{rabbit, [{loopback_users, []}]}].
EOF

systemctl restart rabbitmq-server

# 页面访问http://ip:15672

четыре использования

Так как стек технологий python, вот простой пример использования в python rabbitmq

  • Introduction

Поскольку AMQP — это двунаправленный протокол RPC, в котором клиенты могут отправлять запросы на серверы, а серверы могут отправлять запросы клиентам, Pika реализует или расширяет цикл ввода-вывода в каждом из своих адаптеров асинхронного соединения. Эти циклы ввода-вывода представляют собой методы блокирования циклов и прослушивания событий. Каждый асинхронный адаптер следует одному и тому же стандарту для вызова цикла ввода-вывода. Цикл ввода-вывода создается при создании адаптера соединения. Чтобы запустить цикл ввода-вывода для любого данного адаптера, вызовите метод connection.ioloop.start().

  • install
pip install pika
  • demo
  1. We start by creating our connection object, then starting our event loop.
  2. When we are connected, the on_connected method is called. In that method we create a channel.
  3. When the channel is created, the on_channel_open method is called. In that method we declare a queue.
  4. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us.
  5. When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body.
import pika

# Create a global channel variable to hold our channel object in
channel = None

# Step #2
def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    # Open a channel
    connection.channel(on_open_callback=on_channel_open)

# Step #3
def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)

# Step #4
def on_queue_declared(frame):
    """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
    channel.basic_consume('test', handle_delivery)

# Step #5
def handle_delivery(channel, method, header, body):
    """Called when we receive a message from RabbitMQ"""
    print(body)

# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_open_callback=on_connected)

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Дизайн службы отправки пяти сообщений

图片描述

Большинство потребителей и производителей работают в кластере K8S. Для сервисов отправки сообщений производители отправляют сообщения с Burting_key, используйте подтверждение для подтверждения, Exchange использует прямой режим, и соответствующий Bind_key отправляется в соответствующую очередь, в соединении к Британской очереди. , Начните несколько каналов, каждый из которых соответствует собственным нескольким потребителям для улучшения параллелизма.

Ссылка на ссылку