Предисловие к реальному бою
Как широко используемое промежуточное ПО для сообщений, RabbitMQ играет важную роль в приложениях корпоративного уровня и микросервисных приложениях. В частности, он играет важную роль в некоторых типичных сценариях приложений и бизнес-модулях, таких как развязка модуля бизнес-службы, асинхронная связь, ограничение тока с высокой степенью параллелизма, тайм-ауты, обработка задержки данных и т. д.
Читать официальный сайт RabbitMQ
Прежде всего, давайте прочитаем руководство по технической разработке и связанные с ним функции официального сайта RabbitMQ. Заинтересованные друзья могут терпеливо прочитать соответствующее введение. Я считаю, что будут некоторые достижения. Адрес можно увидеть:ву ву ву.кролик в настоящее время.com/начать.…
В процессе чтения этого руководства мы можем узнать, что ядро RabbitMQ на самом деле основано на «модели сообщений», которая включает в себя соответствующие компоненты, составляющие модель сообщений: производители, потребители, очереди, обмены, маршрутизация, сообщения, и т.д.! В реальных боевых приложениях мы фактически фокусируемся на «модели сообщения», чтобы начать кодирование!
Ниже я представлю эволюцию этой модели сообщения Конечно, этот процесс можно увидеть и на официальном сайте RabbitMQ!
На приведенных выше рисунках выделено несколько ключевых моментов, и можно сказать, что значение этих пунктов соответствует их названиям!
- Производитель: программа, которая отправляет сообщение
- Потребитель: программа, которая прослушивает сообщения потребителей.
- сообщение: строка двоичных потоков данных
- Очередь: промежуточная область/область хранения сообщений
- Exchange: ретрансляционная станция сообщения, которая используется для получения и распространения сообщения. Существует четыре типа разветвлений: прямые, темы и заголовки.
- Маршрутизация: Эквивалент ключа/третьей стороны, привязка к коммутатору может направлять сообщения в указанную очередь!
По мере развития модели сообщений, показанной на рисунке выше, мы будем бороться с различными типичными бизнес-сценариями в виде кода!
SpringBoot интегрирует боевую систему RabbitMQ
Если рабочий хочет хорошо работать, он должен сначала заточить свои инструменты. Сначала нам нужно использовать Spring Initializr IDEA для создания проекта SpringBoot с Maven и ввести зависимости от сторонних фреймворков, таких как RabbitMQ, Mybatis и Log4j. После завершения сборки можно просто написать RabbitMQController для проверки успешности сборки проекта (можно временно собрать его в одном модуле)
Затем мы вступили в основную стадию реального боя, используя RabbitMQ в проектах или службах, по сути, это не что иное, как несколько основных моментов, которые нужно твердо усвоить, эти основные моменты должны «всегда блуждать в вашем собственном коде» ум, который включает:
- Какое сообщение я хочу отправить
- Какую модель сообщений мне нужно создать: DirectExchange+RoutingKey? TopicExchange+RoutingKey? Ждать
- Является ли сообщение, которое я хочу обработать, в режиме реального времени или его нужно отложить/отложить?
- Где должны писать производители сообщений, где должны писать потребители прослушивателей сообщений и какова их логика обработки?
Основываясь на этих моментах, давайте сначала попробуем небольшой трюк, используя RabbitMQ для асинхронной записи журналов и асинхронной отправки электронных писем. Конечно, перед реальным боем нам нужно установить RabbitMQ и его серверное консольное приложение, а также настроить соответствующие параметры RabbitMQ и связанных с ним компонентов Bean в проекте.
1. После завершения установки RabbitMQ откройте внутреннее консольное приложение: http://localhost:15672/ guest guest login, см. следующий рисунок, это означает, что установка прошла успешно.
2. Затем приложение конфигурации файла конфигурации уровня проекта.
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5
Среди них три параметра, которые позже в основном используются для «конфигурируемой конфигурации», указывающей: значение инициализации одновременного потребителя, максимум одновременного потребителя, каждого потребителя, количество сообщений, которые могут быть получены во время каждого потребителя.
Далее нам нужно настроить RabbitMQ способом Configuration и внедрить RabbitMQ способом Bean.При отправке, получении и обработке сообщений типичной конфигурацией RabbitMQ является RabbitTemplate и SimpleRabbitListenerContainerFactory.RabbitMQ监听器
Фабрика контейнеров , код которой выглядит следующим образом:
@Configuration
public class RabbitmqConfig {
private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}}
RabbitMQ Combat: разделение бизнес-модулей и асинхронная связь
В некоторых системах уровня предприятия мы часто видим, что исполнительная функция обычно состоит из множества подмодулей.В процессе выполнения этой функции необходимоСинхронизироватьЕго код будет выполнен с начала до конца, то есть процесс выполняетсяmodule_A -> module_B -> module_C -> module_D
Язык разработки приложений, ориентированный на процесс, типичные случаи можно найти на ассемблере или языке C, теперь есть ряд приложений JavaWeb с такой формулировкой.
И мы знаем, что этот процесс выполнения на самом деле имеет определенные недостатки для всей функции, в основном в двух моментах:
- Время отклика выполнения всей функции будет долгим;
- Если исключение возникает в модуле и не обрабатывается должным образом, оно может повлиять на процесс выполнения и результаты других модулей или даже на всю функцию;
- Код во всей функции может быть очень подробным, и между модулями может потребоваться сильная связь и взаимодействие данных. Когда возникают проблемы, их трудно обнаружить и поддерживать, и даже попасть в неловкую ситуацию «изменить один код и переместить другой». все тело"!
Поэтому нам нужно найти способ оптимизации, нам нужно отделить сильно связанные бизнес-модули и реализовать асинхронную связь между некоторыми модулями! Вот два сценария, которые фактически противоречат нашим мерам по оптимизации!
Сценарий 1. Асинхронная запись журналов операций пользователей
Для приложений микропредприятий или сервисных приложений нам часто нужно отслеживать ретроспективный журнал операций пользователя, который является частью бизнеса таким образом, что его не следует связывать вместе с основным сервисным модулем, и поэтому нам нужно быть извлекаются отдельно и асинхронно с данными взаимодействия основного асинхронного модуля связи.
Далее мы будем использовать модель сообщений RabbitMQ DirectExchange+RoutingKey, чтобы также реализовать сценарий «Журнал успешного входа пользователя». Как упоминалось ранее, мы должны помнить о нескольких ключевых моментах:
- Модель сообщений: модель сообщений DirectExchange+RoutingKey.
- Сообщение: информация об объекте входа пользователя, включая имя пользователя, событие входа в систему, исходный IP-адрес, модуль журнала, к которому они принадлежат, и т. д.
- Отправка и получение: отправка в контроллере, вошедшем в систему, получение в прослушивателе и ввод сообщений, потребляемых слушателем, в таблицу данных; отправка и получение в режиме реального времени.
Сначала нам нужно создать вышеуказанные модели обмена сообщениями класса RabbitmqConfig: включая создание Queue, Exchange, RoutingKey и т. д., следующим образом:
Env получить приведенную выше информацию, нам нужно настроить в application.properties, чтоmq.env=local
:
На этом этапе мы запускаем весь проект/службу и открываем серверное консольное приложение RabbitMQ, чтобы увидеть, что очередь, коммутатор и его привязки установлены, как показано ниже:
Затем нам нужно выполнить логику входа пользователя в контроллер, записать журналы входа пользователя, запросить и получить информацию о ресурсах просмотра роли пользователя и т. д. Из-за нехватки места здесь мы сосредоточимся на использовании MQ для достижения «асинхронной записи пользователя». Логи входа" Логика, то есть Контроллер будет выступать здесь как "производитель", код ядра следующий:
@RestController
public class UserController {
private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
private static final String Prefix="user";
@Autowired
private ObjectMapper objectMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private UserLogMapper userLogMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
//TODO:执行登录逻辑
User user=userMapper.selectByUserNamePassword(userName,password);
if (user!=null){
//TODO:异步写用户日志
try {
UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
userLog.setCreateTime(new Date());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));
Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
e.printStackTrace();
}
//TODO:塞权限数据-资源数据-视野数据
}else{
response=new BaseResponse(StatusCode.Fail);
}
}catch (Exception e){
e.printStackTrace();
}
return response;
}}
В приведенном выше коде «логики отправки» на самом деле также отражены несколько моделей сообщений в эволюции, которую мы ввели в начале, например, мы отправляем сообщения в Exchange вместо Queue, а сообщения передаются в виде бинарных потоков и т. д. Подождите. При использовании postman для запроса метода этого контроллера мы можем увидеть неподтвержденное сообщение во внутреннем консольном приложении RabbitMQ, и мы можем увидеть подробности через GetMessage следующим образом:
Наконец, мы разработаем бизнес-код на стороне потребителя следующим образом:
@Component
public class CommonMqListener {
private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private UserLogMapper userLogMapper;
@Autowired
private MailService mailService;
/**
* 监听消费用户日志
* @param message
*/
@RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
public void consumeUserLogQueue(@Payload byte[] message){
try {
UserLog userLog=objectMapper.readValue(message, UserLog.class);
log.info("监听消费用户日志 监听到消息: {} ",userLog);
//TODO:记录日志入数据表
userLogMapper.insertSelective(userLog);
}catch (Exception e){
e.printStackTrace();
}
}
После запуска службы мы можем отслеживать сообщения, потребляемые в указанной выше очереди, то есть текущую информацию для входа пользователя, а также мы можем видеть, что логика «записи журналов входа пользователя» представляет собой асинхронный процесс, который отличается от основного. бизнес-поток.Поток для выполнения:
Кейс "асинхронная запись лога действий пользователя" я считаю теоретического познания достаточным для интерпретации изложенного выше, и в последующих главах, из-за нехватки места, я сосредоточусь на его основной бизнес-логике!
Сценарий 2. Отправка электронной почты асинхронно
Сценарий отправки электронных писем на самом деле довольно распространен, например, при регистрации пользователя требуется подтверждение электронной почты, пользователи входят в систему, чтобы отправлять уведомления по электронной почте и т. д. Здесь я использую RabbitMQ для асинхронной отправки электронных писем. Шаги для достижения почти такие же, как сцена 1!
1. Создание модели сообщения
2. Создание информации о конфигурации
3. Производственная сторона
4. Потребитель
пасхальные яйца: В этом сообщении блога сначала будет представлена асинхронная развязка и связь модуля бизнес-службы в типичном бизнес-сценарии RabbitMQ. В следующем сообщении блога будет продолжено объяснение механизма подтверждения сообщений памяти приложения RabbitMQ в сценарии системы с высоким параллелизмом и конфигурация параллелизма. Соответствующую базу исходных кодов можно скачать здесь
disk.baidu.com/is/1K UU in_ee f…