Это 6-й день моего участия в августовском испытании обновлений.Подробности о мероприятии:Испытание августовского обновления
Статьи по Теме
Краткое изложение серии RabbitMQ:Серия RabbitMQ
предисловие
-
Что такое очередь сообщений?
- Сообщение (Message) относится к данным, передаваемым между приложениями. Сообщения могут быть очень простыми, например, содержать только текстовые строки, или более сложными, возможно, содержащими встроенные объекты.
- Очередь сообщений (Message Queue) — это метод связи между приложениями.После того как сообщение отправлено, оно может быть немедленно возвращено, а система сообщений может обеспечить надежную доставку сообщения. Издателю сообщения нужно только опубликовать сообщение в MQ, и ему все равно, кто его получит, а потребителю сообщения нужно только получить сообщение от MQ, независимо от того, кто его публикует. Таким образом, ни издатель, ни пользователь не должны знать о существовании друг друга.
-
Зачем использовать очереди сообщений?
- Основная причина в том, что в среде с высокой степенью параллелизма запросы часто блокируются из-за отсутствия времени на синхронную обработку, например, в MySQL одновременно поступает большое количество запросов на вставку, обновление и другие запросы, что напрямую приводит к бесчисленные блокировки строк и таблиц, и даже окончательный запрос будет заблокирован.Накопление слишком много, что вызывает ошибку слишком много соединений. Используя очереди сообщений, мы можем обрабатывать запросы асинхронно, что снижает нагрузку на систему.
-
Особенности RabbitMQ
- Надежность
- RabbitMQ использует некоторые механизмы для обеспечения надежности, такие как постоянство, подтверждение передачи и подтверждение публикации.
- Гибкая маршрутизация
- Сообщения направляются через Exchange до того, как они попадут в очередь. Для типичной функциональности маршрутизации RabbitMQ уже предоставляет некоторые встроенные возможности Exchange для реализации. Для более сложных функций маршрутизации несколько обменов могут быть связаны вместе, а их собственные обмены также могут быть реализованы с помощью механизма подключаемых модулей.
- Кластеры сообщений (кластеризация)
- Несколько серверов RabbitMQ могут образовывать кластер для формирования логического брокера.
- Высокодоступные очереди
- Очереди можно зеркально отразить на компьютерах в кластере, чтобы очереди оставались доступными в случае сбоя некоторых узлов.
- Мультипротокол
- RabbitMQ поддерживает различные протоколы очередей сообщений, такие как STOMP, MQTT и другие.
- Много клиентов
- RabbitMQ поддерживает практически все распространенные языки, такие как Java, .NET, Ruby и т. д.
- Интерфейс управления
- RabbitMQ предоставляет простой в использовании пользовательский интерфейс, который позволяет пользователям отслеживать и управлять многими аспектами брокера сообщений.
- Отслеживание
- Если сообщение является ненормальным, RabbitMQ предоставляет механизм отслеживания сообщения, потребитель может узнать, что произошло.
- Система плагинов (Система плагинов)
- RabbitMQ предоставляет множество плагинов, которые можно расширять разными способами, или вы можете написать свои собственные плагины.
- Надежность
-
Концепция вещей звучит загадочно, что ж, давайте начнем наше путешествие по изучению RabbitMQ с самого простого режима.
Простой режим очереди (одноранговый режим)
-
Создайте новый проект maven, примерная структура каталогов выглядит следующим образом.
-
POM импорт JAR.
-
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
1. Производители
-
package com.dy.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_01 { public static final String Queue_name = "dayu"; /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("120.48.29.41"); factory.setUsername("admin"); factory.setPassword("111111"); //创建一个新连接 Connection connection = factory.newConnection(); //创建一个通道 channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(Queue_name,false,false,false,null); String message="我是生产者,我告诉你一个好消息!"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",Queue_name,null,message.getBytes()); System.out.println("消息发送完毕"); } }
-
Выполните его, чтобы увидеть, можно ли успешно отправить сообщение в очередь.
-
Посетите страницу управления RabbitMQ.
-
Докажите, что производитель был успешно установлен и успешно отправил сообщение.
-
2. Потребители
-
package com.dy.consumer; import com.rabbitmq.client.*; /** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_01 { public static final String Queue_name = "dayu"; public static void main(String[] args) throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("120.48.29.41"); factory.setUsername("admin"); factory.setPassword("111111"); //建立连接 Connection connection = factory.newConnection(); //建立通道 Channel channel = connection.createChannel(); System.out.println("我是消费者,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(Queue_name,true,deliverCallback,cancelCallback); } }
-
Объяснение, DeliverCallback и CancelCallback
-
Посмотрите, какие параметры необходимы
-
Взгляните на DeliverCallback, который является типичным функциональным интерфейсом, поэтому мы можем использовать лямбда-выражения для создания его объектов.
-
import java.io.IOException; @FunctionalInterface public interface DeliverCallback { void handle(String var1, Delivery var2) throws IOException; }
-
-
CancelCallback — то же самое
-
import java.io.IOException; @FunctionalInterface public interface CancelCallback { void handle(String var1) throws IOException; }
-
-
-
Запустите потребителя, чтобы увидеть, может ли он получить сообщение из указанной очереди.
- Сообщение было успешно использовано.
-
Посетите страницу управления RabbitMQ.
- Сообщение равно нулю, что доказывает, что сообщение было успешно использовано!
3. Резюме
-
Простейшая рабочая очередь с одним производителем сообщений, одним потребителем сообщений и одной очередью. также известен как
点对点模式
. -
Грубые шаги продюсера
- присоединиться
- Создать канал
- Создать объявление очереди
- отправлять сообщения
- закрыть очередь
-
Общие шаги для потребителей
- присоединиться
- получить канал
- очередь мониторинга
- Использование сообщений
-
Что ж, после прочтения этого простого режима очереди у меня есть общее представление о публикации и подписке mq!
Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~
Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах