Первый: Введение в RabbitMQ
Очередь RabbitMQ разработана и реализована на языке Erlang на основе протокола AMQP и поддерживает несколько типов клиентов, таких как Java, Ruby, Go, PHP и т. д. Другими популярными промежуточными программами очереди сообщений являются RocketMQ, ActiveMQ, Kafka и так далее. Последующие действия должны протестировать RabbitMQ, если служба не установлена, переместитеУстановка службы RabbitMQ
Таким образом, три самые важные функции MQ — это асинхронность, отсечение пиков и развязка, как показано на следующем рисунке. Остальные сложные понятия из сценария не скопируются, да и писать тут ерунду все равно. Самая простая сводка — это контейнер для хранения данных, который аналогичен базе данных MySQL, базе данных Redis и т. д. Отличие состоит в том, что собственные особенности реализации определяют сценарий приложения.
Два: модель протокола AMQP
Его можно просто понимать как набор стандартных протоколов для передачи сообщений, таких как протокол HTTP и протокол HTTPS, которые имеют свои собственные соответствующие правила. На следующем рисунке показана базовая модель протокола AMQP.
серийный номер | компоненты | описывать |
---|---|---|
1 | Publisher | Производитель, клиент приложения используется для отправки сообщений на сервер |
2 | Connection | Connection, как производителям, так и клиентам-потребителям требуется Connection для подключения к серверу приложений сообщений. Затраты на создание и уничтожение соединения слишком высоки, что приводит к легкому логическому каналу соединения. |
3 | Channel | Облегченные логические соединения, также известные как каналы. Каналы полностью изолированы и потокобезопасны |
4 | Broker | Основная часть службы приложения сообщений, код написания на стороне клиента не будет задействован, что эквивалентно логической концепции |
5 | Virtual Host | Эквивалентно пространству имен, каждый пользователь может работать в области виртуального хоста, выделенной им самим, когда есть несколько пользователей. |
6 | Exchange | Обмен сообщениями, производитель напрямую не связан с очередью, и сообщение пересылается через обмен |
7 | Binding | Отношение привязки, отношение привязки между обменом сообщениями и очередью, путем сравнения с RoutingKey, содержащимся в сообщении, доставленном производителем, чтобы узнать, в какую очередь привязки направляется сообщение. |
8 | Queue | Очередь, где сообщение было сохранено в последний раз |
9 | Consumer | Потребители потребляют напрямую, связываясь с очередями, что отличается от производителей. |
Третий: процесс взаимодействия AMQP
Процесс создания и уничтожения соединения по протоколу HTTP делится на трехстороннее рукопожатие и четырехстороннюю волну, так как же организован процесс взаимодействия между производителем, клиентом-потребителем и сервером приложений сообщений в протоколе AMQP? Команды создания и уничтожения соединений производителя, потребителя и сервера согласованы, но команды, участвующие в промежуточном логическом потоке, определенно различаются. Таким образом, весь процесс производства-потребления делится на три части: создание и уничтожение соединения, создание сообщений и потребление сообщений.
3.1 Создание и уничтожение соединения
Создание и уничтожение соединения включает в себя связь четырех модулей.Сначала создается и открывается соединение, затем создается и открывается канал, и, наконец, канал закрывается и соединение закрывается. Из процесса можно лучше понять, что канал канала представляет собой логическое соединение, основанное на соединении соединения, и должно зависеть от соединения соединения. Остальные шаги являются командами — подтвердите формат команды, чтобы обеспечить нормальную связь между клиентом и сервером.
3.2 Создание сообщений
Процесс создания сообщения показан на рисунке ниже, дело в том, что сервер не отвечает на эту операцию. Итак, вот ссылка, которая вызывает потерю сообщений.Когда сетевое соединение между сервером и клиентом колеблется, а другие проблемы приводят к недоступности и закрытию соединения, передаваемые сообщения могут быть потеряны.
3.3 Использование сообщений
Ключевым моментом потребления сообщения является механизм подтверждения, то есть после того, как сообщение будет отправлено с сервера клиенту, оно не будет удалено сразу, а будет изменен флаг состояния сообщения, а соответствующая логика будет выполнена после ожидание результата подтверждения от клиента. Разумеется, могут быть разные результаты подтверждения обратной связи, а также различные операции, о которых будет рассказано в последующих статьях.
Четыре: базовая работа клиента RabbitMQ
Как упоминалось ранее, RabbitMQ поддерживает многоязычные клиенты, поскольку автор занимается разработкой на Java, в следующем примере будет использоваться Java. Клиенты на других языках программирования могут ссылаться наОфициальный сайт RabbitMQ, если вы протестируете демонстрацию функции RabbitMQ непосредственно в проекте J2EE, вы можете ввести следующие зависимости
<!--RabbitMQ依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
4.1 Создайте канал подключения
@SneakyThrows
public static Channel createChannel(){
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ服务应用信息
// 服务默认端口5672、安装启动后会默认有个账号guest、密码guest
factory.setHost("RabbitMQ应用服务安装IP地址");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 实例化连接
Connection connection = factory.newConnection();
// 获取信道实例
Channel channel = connection.createChannel();
return channel;
}
4.2 Push-сообщения
Обратите внимание, что это только самое основное push-сообщение для обмена, а затем маршрутизация сообщения в соответствии с routingKey и Binding.Что касается некоторых обменов или очередей, участвующих в коде, характерные атрибуты сообщения будут объяснены позже. Конечно, обмен — это не только разветвление, которое появляется в коде, множество обменов предоставляют различные функции маршрутизации сообщений.
@SneakyThrows
public static void publishMessage(Channel channel,byte[] message){
// 实例化消息服务组件
String exchangeName = "exchangeName";
String queueName = "queueName";
String binding = "binding";
String routingKey = binding;
// 交换器持久化、自动删除
boolean durable = true , autoDelete = false;
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,durable,autoDelete,null);
// 独占队列
boolean exclusive = false;
channel.queueDeclare(queueName,durable,exclusive,autoDelete,null);
// 交换器、队列绑定
channel.queueBind(queueName,exchangeName,binding);
// 发送生产消息
channel.basicPublish(exchangeName,routingKey,null,message);
}
4.3 Использование сообщений
Сообщение о потреблении здесь фактически включает механизм подтверждения, который был предложен в предыдущей статье и будет подробно объяснен позже. Обратите внимание, что потребление сообщений использует класс DefaultConsumer, который реализует интерфейс Consumer, а интерфейс Consumer предоставляет ряд методов обработки сообщений, то есть для обработки сообщений в RabbitMQ либо реализуйте интерфейс Consumer, либо наследуйте класс DefaultConsumer, чтобы переписать интерфейс. логика. Переписанный метод handleDelivery() в следующем коде можно понять, взглянув на команду процесса взаимодействия потребления сообщений RabbitMQ.
@SneakyThrows
public static void consumeMessage(Channel channel){
String queueName = "queueName";
String consumerTag = "consumerTag";
// 消息消费实例
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
// 处理消息
}
};
// 自动消息确认
boolean autoAck = true;
channel.basicConsume(queueName,autoAck,consumerTag,consumer);
}
4.4 Разрушение канала связи
Здесь нужно обратить внимание на порядок закрытия, и это не значит, что неправильно закрывать Соединение напрямую, но если в соединении есть другие каналы, то не разрешается закрывать Соединение, иначе другие коды, использующие Канал вызовет исключение. То есть закрытие Соединения равносильно закрытию всех Каналов, а Каналы — это логические соединения, которые зависят от Соединений.
@SneakyThrows
public static void destroyConnection(Connection connection,Channel channel){
channel.close();
connection.close();
}