Принцип реализации инструмента RabbitMQ и руководство по использованию
Цель:
Сократите работу по разработке повторяющегося кода, упростите доступ к RabbitMQ и позвольте программистам больше сосредоточиться на обработке бизнес-логики.
Для получения исходного кода перейдите к 2-②
1. Принцип реализации
Основные принципы динамического создания
Когда Spring запустится, используйте Spring Bean для управления заводским интерфейсом BeanFactory для достижения动态
Создайте交换机
,队列
,交换机和队列的绑定关系
, чтобы нам не приходилось выполнять повторяющуюся работу по кодированию.
Принцип переключения вещания
При отправке сообщения на "fanout
”, RabbitMQ будет忽略路由键
,直接
отправить сообщение该交换机下所有队列
. Таким образом, он потребляется потребителями каждой очереди.
Принципы прямого подключения коммутаторов
При отправке сообщения на "direct
” наберите переключатель, RabbitMQ будет искать и передавать входящий ключ маршрутизации完全匹配
Очередь, а затем поместить сообщение в очередь и потреблять от потребителей в очереди.
Принцип переключения подстановочных знаков
При отправке сообщения на "topic
” наберите переключатель, RabbitMQ будет искать и передавать входящий ключ маршрутизации规则匹配
Очередь, а затем поместить сообщение в очередь и потреблять от потребителей в очереди.
Принцип переключения головок
При отправке сообщения на "headers
”, RabbitMQ будет忽略路由键
, согласно настройкам, заданным при привязке маршрутизаторов и очередей完全匹配
или部分匹配
в качестве ссылки,以消息内容中的headers属性对队列进行匹配
, только успешно сопоставленная очередь может получить сообщение для дальнейшего использования потребителями.
Специальное примечание
manual模式
(Ручной ACK), примечание一定要
сообщение确认
. Если вы забудете ACK, то последствия будут серьезными. Когда потребитель выходит, сообщение всегда будет перераспределено. Тогда RabbitMQ будет брать все больше и больше контента, так как RabbitMQ будет работать долго, это "内存泄漏
"Да致命
из.
2. Анализ исходного кода
①、Описание документа
ExchangeTypeEnum
Описание: перечисление типов обмена RabbitMQ.
Роль: объявляет распространенные типы обменов
Режим работы: операция не требуется
Примечания: нет
ExchangeEnum
Описание: Перечисление обмена RabbitMQ
Роль: объявляет переключатели, которые должен использовать проект.
Режим работы: добавить или удалить по мере необходимости
Примечание. Поскольку коммутатор типа разветвления будет игнорировать ключ маршрутизации и напрямую доставлять на все маршрутизаторы, для каждой отдельной широковещательной рассылки следует создавать новый коммутатор.
QueueArguments
Описание: параметры очереди
Роль: объявляет параметр Map, используемый при создании очереди.
Режим работы: изменить по мере необходимости
Примечания: нет
QueueHeaders
Описание: Информация заголовка очереди
Роль: объявляет соответствующую информацию, используемую обменами типа ExchangeTypeEnum.headers.
Режим работы: изменить по мере необходимости
Примечания: нет
QueueEnum
Описание: Перечисление конфигурации очереди
Роль: объявляет очередь, которую должен использовать проект.
Режим работы: операция не требуется
Примечание. Для всех очередей, указанных как ExchangeEnum.fanout, к имени очереди будет добавлено «.10_87_10_*» (цифровая часть — это IP-адрес службы).
RabbitMQConfig
Описание: Основной класс конфигурации RabbitMQ.
Функция: реализовать динамическое создание переключателей, динамическое создание очередей и динамическое связывание.
Режим работы: операция не требуется
Примечания: Этот класс зарегистрировал все имена очередей в форме карты дляspring bean工厂
, имя боба"queuesNames
",структура:Map<String, String>
(Map) Потребители могут использовать его напрямую@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})
контролировать
DefaultMQCallback
Описание: Класс обратного вызова по умолчанию, реализованный из MQCallback.
Роль: класс обратного вызова для успешной или неудачной отправки сообщения.
Режим работы: операция не требуется
Примечание. Его можно реализовать вручную, а изменение можно выполнить, вызвав метод queueMessageServiceImpl.setMQCallback(mqCallback).
QueueMessageServiceImpl
Описание: класс отправки сообщений по умолчанию, реализованный из QueueMessageService.
Роль: реализует базовый интерфейс для отправки сообщений
Режим работы: операция не требуется
Примечания: нет
②, анализ основного кода
весенняя пробная регистрация
public static <T> boolean registerBean(String beanName, T bean) {
// 将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext context = (ConfigurableApplicationContext) SpringContextUtil.getApplicationContext();
// 将bean对象注册到bean工厂
context.getBeanFactory().registerSingleton(beanName, bean);
log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean);
return true;
}
Динамически создавать bean-компонент с именем очереди
@Bean("queuesNames")
public Map<String, String> queuesNames() {
return QueueEnum.getQueuesNames();
}
// 这个方法在QueueEnum
public static Map<String, String> getQueuesNames() {
return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName()));
}
Динамически создавать переключатели
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange();
* @description 动态创建交换机
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:28
*/
@Bean("createExchange")
public Object createExchange() {
// 遍历交换机枚举
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 声明交换机
Exchange exchange;
// 根据交换机模式 生成不同的交换机
switch (exchangeEnum.getType()) {
case fanout:
exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case topic:
exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case headers:
exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case direct:
default:
exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
}
// 将交换机注册到spring bean工厂 让spring实现交换机的管理
if (exchange != null) {
SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
Динамически создавать очереди
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue();
* @description 动态创建队列
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createQueue")
public Object createQueue() {
// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments())));
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
Динамически создавать отношения привязки
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding();
* @description 动态将交换机及队列绑定
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createBinding")
public Object createBinding() {
// 遍历队列枚举 将队列绑定到指定交换机
QueueEnum.toList().forEach(queueEnum -> {
// 从spring bean工厂中获取队列对象(刚才注册的)
Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class);
// 声明绑定关系
Binding binding;
// 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
switch (queueEnum.getExchangeEnum().getType()) {
case fanout:
FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class);
binding = BindingBuilder.bind(queue).to(fanoutExchange);
break;
case topic:
TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class);
binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey());
break;
case headers:
HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class);
if (queueEnum.isWhereAll()) {
// whereAll表示全部匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match();
} else {
// whereAny表示部分匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match();
}
break;
case direct:
default:
DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class);
binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey());
break;
}
// 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
if (binding != null) {
SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
3. Используйте учебник
①, добавьте и удалите перечисление переключателей по мере необходимости
②, добавляйте и удаляйте перечисление очереди по мере необходимости
③, для достижения мониторинга потребителей
④, позвоните продюсеру, чтобы отправить сообщение
⑤, просмотрите результаты теста
4. Часто задаваемые вопросы
①, вызовите метод QueueMessageService.convertSendAndReceive, и возвращаемое значение равно null
Когда потребительский методvoid
метод, возвращаемое значение метода QueueMessageService.convertSendAndReceive равно null, обратите внимание на проверку метода прослушивания потребителя.
② Что делать, если метод QueueMessageService.convertSendAndReceive используется несколькими потребителями?
Когда сообщение, отправленное QueueMessageService.convertSendAndReceive, потребляется несколькими потребителями, возвращаемый результат является возвращаемым значением первого потребителя. Рекомендуется, чтобы очереди с возвращаемыми значениями использовали режим прямого подключения (direct
), что гарантирует, что сообщение будет использовано только одним потребителем.
3. Какие сценарии использования широковещательного коммутатора (ExchangeTypeEnum.fanout)
广播
Роль переключателя类似
В村里面的小喇叭
, когда в деревню сообщается новая политика, она будет воспроизводиться на маленьком динамике, чтобы все (в очереди) останавливались на содержании.
Может использоваться для отправки подобных объявлений, глобальных системных уведомлений и т. д.广播
необходимость.
④ Какие сценарии использования коммутатора с прямым подключением (ExchangeTypeEnum.direct)
直连
Роль переключателя类似
В邮递员
, адрес получателя написан на конверте, и полученное сообщение будет помещено в разные очереди в соответствии с пунктом назначения.Если почтальон не найдет адрес, письмо будет отброшено.
можно использовать для点对点
общение между.
⑤. Каковы сценарии использования переключателя подстановочных знаков (ExchangeTypeEnum.topic)
通配符
выключатель类似
В脑子不聪明的邮件分发员
, получил письмо, отправленное на адрес "张家镇李家村12号
», распространитель пришел в пункт выдачи и обнаружил два запачканных почтовых ящика (очереди), один хорошо виден\*李家村\*
, ясно видно张家镇\*
, курьер просто отправил письмо复制
Закажи один, пойди в два места都送
доставил копию.
Может использоваться для одной и той же задачи, требующей разных модулей или модулей распределенной системы同时执行
при использовании.
⑥ Каковы сценарии использования переключателя заголовка (ExchangeTypeEnum.headers)
头
выключатель类似
В程序中的鉴权
, у администратора есть все права, у пользователя есть права на просмотр, пользователь не может вызвать интерфейс, требующий разрешения администратора (сообщение не может быть распределено в очередь, которая не слушает заголовок), а интерфейс с соответствующим разрешение может быть вызвано только с соответствующим разрешением (сообщение распространяется на заголовок, который слушает заголовок). очереди). Здесь следует отметить, что головной коммутатор поддерживает完全匹配
и部分匹配
,完全匹配
интерфейс для администраторов必须拥有所有
Только права администратора могут звонить,即拥有所有管理员权限才显示进入管理员后台的按钮
.部分匹配
за拥有其中一个或部分权限
можно назвать,即拥有任意管理员权限就显示进入管理员后台的按钮
.
Эта статья предназначена только для моих собственных учебных заметок.Поскольку принцип реализации очень прост, и основной исходный код был показан в этой статье, ссылка на исходный код не будет опубликована. Поскольку у меня нет глубоких исследований RabbitMQ, если в этой статье есть какие-либо ошибки, пожалуйста, укажите, что я буду изучать их непредвзято и активно исправлять их.
Я впервые пишу статью, надеюсь, вы поддержите и поддержите ее.