Очередь сообщений (сокращенно MQ) сейчас используется во многих компаниях, и существует много фреймворков MQ. Наиболее популярными из них являются RabbitMq, ActiveMq, ZeroMq, kafka и RocketMQ с открытым исходным кодом от Али. По совпадению, наша компания использует RabbitMq, поэтому эта статья в основном представляет RabbitMq ([закрывает лицо] На самом деле, я не знаю других MQ, и я опубликую их снова в будущем), в основном из следующих моментов:
1. Установка RabbitMq
2. Использование RabbitMq в проекте SpringBoot (пять режимов)
3. Очередь задержки
4. Проблемы и решения, возникающие при использовании RabbitMq (такие как механизм повторных попыток сообщения, повторное использование сообщений, потеря сообщений, последовательное использование сообщений и т. д.)
Нелегко быть оригинальным, и, наконец, я надеюсь, что вы поставите лайк!
текстУстановка RabbitMq под Mac (установка виндовс нажмите здесь)
Установить доморощенный
1. ВведитеОфициальный сайт домашнего пивоваренияКоманда копирования:
2. Откройте средний конец компьютера, вставьте команду в средний конец и нажмите клавишу Enter для выполнения и дождитесь завершения установки:
Меры предосторожности
1. Если появится сообщение типа curl: (7) Не удалось подключиться для подключения к порту 443 raw.githubusercontent.com: Отказ в подключении появляется после того, как вы вставите команду и нажмете Enter, подключитесь к точке доступа мобильного телефона и повторите попытку, если это не так. все еще не работаеткликните сюда.
Установить RabbitMq
1. Откройте терминал, введите следующую команду: Brew Установите rabbitmq, щелкните клавишу Enter, чтобы дождаться установки, и увидите следующий интерфейс как успех:
Запустить RabbitMq
1. После завершения установки введите следующее содержимое в средний конец, чтобы начать, и см. шаги на рис. 4, что означает, что запуск прошел успешно (не закрывайте средний конец после успешного запуска, иначе вы не увидеть его при входе в браузер):
1、cd /usr/local/Cellar/rabbitmq/
2、cd 3.8.3(版本号看你自己的是多少)
3、sbin/rabbitmq-server
2. Войдите в браузерhttp://localhost:15672Для просмотра пароль по умолчанию для инициализации - guest/guest
Добавить пользователя и авторизацию
Добавить пользователя
(1), суперадминистратор (администратор)
Вы можете войти в консоль управления, просмотреть всю информацию и управлять пользователями и политиками.
(2), мониторинг (мониторинг)
Вы можете войти в консоль управления и просмотреть соответствующую информацию об узле rabbitmq (количество процессов, использование памяти, использование диска и т. д.).
(3), политик (политик)
Вы можете войти в консоль управления и одновременно управлять политикой. Тем не менее, релевантная информация об узле не может быть просмотрена (часть, отмеченная красным прямоугольником на рисунке выше).
(4), рядовые менеджеры (управление)
Вы можете войти в консоль управления только, вы не можете видеть информацию узла, и вы не можете управлять политиками.
Разрешить
Виртуальным хостом RabbitMq по умолчанию является /, если вы хотите настроить виртуальный хост,пожалуйста, нажмите здесь.
Использование RabbitMq в проекте SpringBoot
Элемент конфигурации
добавить зависимости
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Изменить файл конфигурации
spring:
rabbitmq:
host: 自己rabbit的路径
port: 自己rabbit的端口
virtual-host: /
username: 自己rabbit的账号
password: 自己rabbit的密码
publisher-confirms: true
Создайте новый пакет Rabbit в рамках проекта, содержимое можно скопировать и вставить напрямую.
1. Добавьте класс сущности для получения информации о файле конфигурации.
@Data
@ToString
@Configuration
public class RabbitProperties {
/**
* rabbitmq 服务器地址
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* rabbitmq 服务器端口
*/
@Value("${spring.rabbitmq.port}")
private int port;
/**
* rabbitmq 账号
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* rabbitmq 密码
*/
@Value("${spring.rabbitmq.password}")
private String password;
}
2. Добавьте класс конфигурации
@Configuration
public class RabbitConfiguration {
@Autowired
private RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost(), rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/**
* @return
* @Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)这个是说在每次注入的时候回自动创建一个新的bean实例
* @Scope(value=ConfigurableBeanFactory.SCOPE_SINGLETON)单例模式,在整个应用中只能创建一个实例
* @Scope(value=WebApplicationContext.SCOPE_GLOBAL_SESSION)全局session中的一般不常用
* @Scope(value=WebApplicationContext.SCOPE_APPLICATION)在一个web应用中只创建一个实例
* @Scope(value=WebApplicationContext.SCOPE_REQUEST)在一个请求中创建一个实例
* @Scope(value=WebApplicationContext.SCOPE_SESSION)每次创建一个会话中创建一个实例
* proxyMode=ScopedProxyMode.INTERFACES创建一个JDK代理模式
* proxyMode=ScopedProxyMode.TARGET_CLASS基于类的代理模式
* proxyMode=ScopedProxyMode.NO(默认)不进行代理
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
// template.setMandatory(true);
return template;
}
}
3. Определите имена очередей, обменов и т. д. (есть новые очереди и обмены, связанные с заказами)
public class RabbitMqKey {
/**
* 订单-队列
*/
public static final String TRADE_ORDER_QUEUE = "trade-order-queue";
/**
* 订单-交换器
*/
public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";
}
4. Инициализируйте очереди, обмены и т. Д. И связывать отношения
@Component
public class TradeOrderQueueConfig {
private final static Logger logger = LoggerFactory.getLogger(TradeOrderQueueConfig.class);
/**
* 创建队列
* Queue 可以有4个参数
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false
* Map<String, Object> arguments:
*
* @return
*/
@Bean(name = "queue")
public Queue queue() {
logger.info("queue : {}", RabbitMqKey.TRADE_ORDER_QUEUE);
// 队列持久化
return new Queue(RabbitMqKey.TRADE_ORDER_QUEUE, true);
}
/**
* 创建一个 Fanout 类型的交换器
* <p>
* rabbitmq中,Exchange 有4个类型:Direct,Topic,Fanout,Headers
* Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中;
* Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中;
* Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key;
* Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中;
*
* @return
*/
@Bean(name = "fanoutExchange")
public FanoutExchange fanoutExchange() {
logger.info("exchange : {}", RabbitMqKey.TRADE_ORDER_EXCHANGE);
return new FanoutExchange(RabbitMqKey.TRADE_ORDER_EXCHANGE);
}
/**
* 把队列(Queue)绑定到交换器(Exchange)
* topic 使用路由键(routingKey)
*
* @return
*/
@Bean
Binding fanoutBinding(@Qualifier("queue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
5, отправка класса сообщения
@Component
public class Sender {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 如果rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
* 需手动注入
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订单信息(发送至交换器)
*
* @param payload
* @return
*/
public String orderSendExchange(Object payload){
return baseSend(RabbitMqKey.TRADE_ORDER_EXCHANGE, "", payload, null, null);
}
/**
* 订单信息(发送至队列)
*
* @param payload
* @return
*/
public String orderSendQueue(Object payload){
return baseSend("", RabbitMqKey.TRADE_ORDER_QUEUE, payload, null, null);
}
/**
* MQ 发送数据基础方法
*
* @param exchange 交换器名
* @param routingKey 队列名
* @param payload 消息信息
* @param uniqueMessageId 标示id,不传可自动生成
* @param messageExpirationTime 持久化时间
* @return 消息编号
*/
public String baseSend(String exchange, String routingKey, Object payload, String uniqueMessageId, Long messageExpirationTime) {
// 生成消息ID
String finalUniqueMessageId = uniqueMessageId;
if (StringUtils.isBlank(uniqueMessageId)) {
uniqueMessageId = UUID.randomUUID().toString();
}
logger.info("SEND --- unique message id:{}", uniqueMessageId);
// 消息属性
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 消息属性中写入消息编号
message.getMessageProperties().setMessageId(finalUniqueMessageId);
// 消息持久化时间
if (!StringUtils.isEmpty(String.valueOf(messageExpirationTime))) {
logger.info("设置消息持久化时间:{}", messageExpirationTime);
message.getMessageProperties().setExpiration(Long.toString(messageExpirationTime));
}
// 设置持久化模式
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
logger.info("SEND --- messagePostProcessor:{}", messagePostProcessor);
// 消息
Message message = null;
try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(payload);
logger.info("发送消息:{}", payload.toString());
// 转换数据格式
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
message = new Message(json.getBytes(), messageProperties);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// correlationData
CorrelationData correlationData = new CorrelationData(uniqueMessageId);
/**
* convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
* exchange: 路由
* routingKey: 绑定key
* message: 发送消息
* messagePostProcessor: 消息属性处理类
* correlationData: 对象内部只有一个 id 属性,用来表示当前消息唯一性
*/
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
return finalUniqueMessageId;
}
}
6. Подтвердите информацию
@Component
public class RabbitAck implements RabbitTemplate.ConfirmCallback {
private final static Logger logger = LoggerFactory.getLogger(RabbitAck.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
//rabbitTemplate如果为单例的话,那回调就是最后设置的内容
rabbitTemplate.setConfirmCallback(this);
}
/**
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("ACK --- MQ message id: {}" + correlationData);
if (ack) {
logger.info("ACK --- Message sent confirmation success!");
} else {
logger.info("ACK --- MQ message id: {}", correlationData.getId());
logger.info("ACK --- MQ confirmetion: {}", ack);
logger.info("ACK --- Message sending confirmation failed, reason for failure:" + cause);
}
}
}
упражняться
1. Простая очередь
p: производитель сообщения (то есть человек, который отправляет письмо)
c: потребитель сообщения (то есть получатель)
Красная сетка: очередь (то есть почтальон, человек, который доставляет письмо)
Сообщение, отправленное в очередь, будет получено только потребителем, а затем использовано. Если вы хотите, чтобы несколько потребителей получали потребление, см. Содержание!
1. Отправить сообщение
(1) Добавить новый интерфейс для отправки сообщений
@RestController
public class ProducersController {
@Resource
private Sender sender;
@PostMapping("/producers")
public void producers(){
sender.orderSendQueue("Hello World");
}
}
(2) Интерфейс вызова PostMan
(3) Просмотрите консоль проекта
(4) Посмотреть кролика
2, получите сообщение
(1) Слушатель, добавлена новая информация об очереди заказов на прием
@Component
public class OrderQueueListener {
private static final Logger logger = LoggerFactory.getLogger(OrderQueueListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isBlank(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println(msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
(2) Поскольку мы только что отправили информацию, пока проект запущен, информация может быть получена
2. Режим работы
p: производитель сообщения (то есть человек, который отправляет письмо)
c1, c2: потребитель сообщения (то есть получатель)
Красная сетка: очередь (то есть почтальон, человек, который доставляет письмо)
(1) Отправьте сообщение, добавьте интерфейс к контроллеру и отправьте 100 сообщений.
@PostMapping("/batch/producers")
public void batchProducers(){
for (int i = 0; i < 100; i++){
sender.orderSendQueue("Hello World" + i);
}
}
(2) два открытых сервиса, получают одну и ту же очередь сообщений
@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isBlank(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务1接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isBlank(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务2接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
(3) Просмотр результатов двух сервисов
Результат: несколько служб получают информацию из одной очереди, а MQ по умолчанию отправляет информацию путем опроса! Если вы хотите принять модель справедливого распределения (способные работают больше), см. ниже
(4) Измените файл конфигурации службы 1.
(5) Измените файл конфигурации службы 2.
(6) Повторно протестируйте и проверьте результаты (см. рисунок ниже, чтобы увидеть, что сообщений, полученных службой 2, значительно больше, чем сообщений, полученных службой 1).
3. Модель подписки
p: производитель сообщения (то есть человек, который отправляет письмо)
x: Exchanger (то есть почта)
c1, c2: потребитель сообщения (то есть получатель)
Красная сетка: очередь (то есть почтальон, человек, который доставляет письмо)
Письмо отправляется двум людям через почтовое отделение, и почтовое отделение отправит двух почтальонов, чтобы доставить письмо соответствующему человеку.Здесь две разные очереди привязаны к одному и тому же обмену, чтобы гарантировать, что сообщение может быть получено обоими услуги одновременно.
В rabbitmq Exchange имеет 4 типа: Direct, Topic, Fanout, Headers
Прямой обмен: Сравните ключ маршрутизации в сообщении с ключом маршрутизации во всех привязках, связанных с обменом, и, если они равны, отправьте их в очередь, соответствующую привязке;
Тематический обмен: Сравните ключ маршрутизации в сообщении с ключом маршрутизации во всех привязках, связанных с обменом.Если он совпадает, отправьте его в очередь, соответствующую привязке;
Fanout Exchange: Напрямую пересылать сообщение в соответствующую очередь всех привязок.Этот тип обмена игнорирует ключ маршрутизации при маршрутизации и пересылке;
Обмен заголовками: сопоставьте заголовки в сообщении с параметрами во всех привязках, связанных с обменом, и, если они совпадают, отправьте их в очередь, соответствующую привязке;
Когда Direct Exchange и Topic Exchange выполняют привязку, вам необходимо указать ключ маршрутизации
Когда Fanout Exchange и Headers Exchange выполняют привязку, нет необходимости указывать ключ маршрутизации.
1. Обмен привязками к очереди
Как видно из предыдущей статьи я привязал очередь ордеров к бирже (тип биржи ордеров Fanout Exchange)
Теперь привяжите еще один сервис
2. Просмотрите отношения привязки
3. Запустить два сервиса и отправить информацию на биржу заказов
(1) Добавьте интерфейс для отправки на коммутатор
(2) Вызовите интерфейс и выполните команду отправки
(3) Просмотр двух результатов обслуживания
Если коммутатор не привязывает очередь, сообщение, отправленное на коммутатор, будет потеряно, потому что коммутатор не хранит возможности сообщения, а сообщение может существовать только в очереди.
4, режим маршрутизации
Режим маршрутизации аналогичен режиму публикации-подписки, а затем к режиму подписки добавляется тип.Режим подписки распространяется на все очереди, привязанные к коммутатору, а режим маршрутизации распространяется только на очередь, привязанную к указанному ключ маршрутизации на коммутаторе, т.е. по определенному правилу, какие очереди получают контент, отправленный на биржу, не все очереди, привязанные к бирже, могут его получить.
p: производитель сообщения (то есть человек, который отправляет письмо)
Обмен типа x:direct (то есть почта)
c1, c2: потребитель сообщения (то есть получатель)
Красная сетка: очередь (то есть почтальон, человек, который доставляет письмо)
На приведенном выше рисунке показана комбинация уровней потребления журналов. В режиме маршрутизации сообщения будут направляться в те очереди, чьи ключи привязки и ключи маршрутизации точно совпадают. Этот режим также является прямым режимом в режиме Exchange.
Конфигурация на приведенном выше рисунке взята в качестве примера.Когда мы отправляем сообщение в Exchange с routingKey="error", сообщение будет перенаправлено в Queue1 (amqp.gen-S9b..., что является именем очереди, автоматически сгенерированным от RabbitMQ) и Queue2 (amqp.gen-Agl...). Если мы отправим сообщение с routingKey="info" или routingKey="warning" , сообщение будет перенаправлено только в Queue2. Если мы отправим сообщение с другим ключом маршрутизации, сообщение не будет перенаправлено в эти две очереди.
1. Создайте новые очереди, обмены и маршруты и привяжите их
(1) Услуга 1:
public class RabbitMqKey {
/**
* 订单-队列
*/
public static final String TRADE_ORDER_QUEUE = "trade-order-queue";
/**
* 订单-交换器
*/
public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";
/**
* 路由测试队列
*/
public static final String TRADE_DIRECT_TEST_QUEUE = "trade-direct-test-queue";
/**
* 路由测试交换器
*/
public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";
/**
* 路由
*/
public static final String ROUTING_KEY = "ERROR";
}
Отношения привязки в классе TradeOrderQueConfigue:
(2) Услуга 2:
/**
* 路由测试队列
*/
public static final String TRADE_DIRECT_TEST_QUEUE_V2 = "trade-direct-test-queue-v2";
/**
* 路由测试交换器
*/
public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";
/**
* 路由
*/
public static final String ROUTING_KEY = "INFO";
2. Просмотрите отношения привязки
3. Две службы соответственно создают новый класс DirectListener для получения информации об очереди.
(1) Услуга 1:
@Component
public class DirectListener {
private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isEmpty(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务1接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
(2) Услуга 2:
@Component
public class DirectListener {
private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE_V2)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isEmpty(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务2接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
4. Класс Sender добавляет метод для отправки сообщения по указанному маршруту и интерфейс для отправки сообщения.
/**
* ERROR路由发送消息至交换器
*
* @param payload
* @return
*/
public String errorSendQueue(Object payload){
return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, RabbitMqKey.ROUTING_KEY, payload, null, null);
}
/**
* INFO路由发送消息至交换器
*
* @param payload
* @return
*/
public String infoSendQueue(Object payload){
return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, "INFO", payload, null, null);
}
Добавьте два интерфейса в класс ProducersController.
/**
* info
*/
@PostMapping("/send/info")
public void sendInfo(){
sender.infoSendQueue("我是info级别的日志,你可以不用管我");
}
/**
* info
*/
@PostMapping("/send/error")
public void sendError(){
sender.errorSendQueue("我是error级别的日志,你也可以不用管我,只要你不怕死");
}
Затем перезапустите две службы
(5) Вызовите интерфейс /send/info для просмотра результатов.
Служба 1 не получила сообщение:
Сервис 2 получил сообщение:
(6) Вызовите интерфейс /send/error для просмотра результатов.
Услуга 1:
Услуга 2:
5. Тематический режим (режим шаблона)
p: производитель сообщения (то есть человек, который отправляет письмо)
Биржа типа x:topic (то есть почта)
c1, c2: потребитель сообщения (то есть получатель)
Красная сетка: очередь (то есть почтальон, человек, который доставляет письмо)
1. Создайте новые очереди, обмены и отношения привязки
(1) Услуга 1:
/**
* 路由测试队列
*/
public static final String TRADE_TOPIC_TEST_QUEUE = "trade-topic-test-queue";
/**
* 路由测试交换器
*/
public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";
/**
* 路由
*/
public static final String TOPIC_ROUTING_KEY = "JAVA.#";
(2) Услуга 2:
/**
* 路由测试队列
*/
public static final String TRADE_TOPIC_TEST_QUEUE_V2 = "trade-topic-test-queue-v2";
/**
* 路由测试交换器
*/
public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";
/**
* 路由
*/
public static final String TOPIC_ROUTING_KEY = "JAVA.*";
2. Просмотрите отношения привязки
3. Две службы соответственно создают новый класс TopicListener для получения информации об очереди.
(1) Услуга 1:
@Component
public class TopicListener {
private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isEmpty(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务1接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
(2) Услуга 2:
@Component
public class TopicListener {
private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE_V2)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isEmpty(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("服务2接收到的数据:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
4. Класс Sender добавляет метод для отправки сообщения по заданному маршруту и интерфейс для отправки сообщения.
/**
* 发送消息至topic类型的交换器
*
* @param payload
* @return
*/
public String topicErrorSendQueue(Object payload){
return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG", payload, null, null);
}
/**
* 发送消息至topic类型的交换器
*
* @param payload
* @return
*/
public String topicInfoSendQueue(Object payload){
return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG.ERROR", payload, null, null);
}
Добавьте два интерфейса в класс ProducersController.
/**
* 发送信息
*/
@PostMapping("/send/java")
public void sendJava(){
sender.topicErrorSendQueue("JAVA.*:匹配不多不少一个词 JAVA.#:匹配一个或多个词");
}
/**
* 发送信息
*/
@PostMapping("/send/java/error")
public void sendJavaError(){
sender.topicInfoSendQueue("JAVA.*:匹配不多不少一个词 JAVA.#:匹配一个或多个词");
}
5. Вызовите интерфейс /send/java для просмотра результатов (ключ маршрутизации — JAVA.LOG, поэтому обе службы могут быть сопоставлены).
(1) Услуга 1:
(2) Услуга 2:
6. Вызовите интерфейс /send/java/error для просмотра результатов (ключ маршрутизации — JAVA.LOG.ERROR, поэтому может совпадать только JAVA.#)
(1) Услуга 1:
(2) Услуга 2:
Здесь используется подстановочный ключ маршрутизации *.*, обратите внимание, что точка посередине не может быть изменена!
очередь задержки
Что такое очередь задержки
Очередь с задержкой, прежде всего, это своего рода очередь, очередь означает, что внутренние элементы упорядочены, элементы dequeue и enqueue являются направленными, элементы вводятся с одного конца и вынимаются с другого конца.
Во-вторых, наиболее важной особенностью очереди с задержкой является ее атрибут delay.В отличие от обычной очереди, элементы в обычной очереди всегда ждут, чтобы их вынули и обработали раньше, а элементы в очереди с задержкой всегда ждут обработки. Ожидается, что элементы будут извлечены и обработаны в указанное время, поэтому все элементы в очереди задержки имеют атрибут времени, обычно это сообщения или задачи, которые необходимо обработать.
Проще говоря, очередь задержки - это очередь, используемая для хранения элементов, которые необходимо обрабатывать в указанное время.
Сценарии использования очереди задержки
(1) Если заказ не будет оплачен в течение десяти минут, он будет автоматически отменен.
(2) Для вновь созданного магазина, если ни один продукт не был загружен в течение десяти дней, напоминание о сообщении будет отправлено автоматически.
(3) Если счет не будет оплачен в течение одной недели, он будет оплачен автоматически.
(4) Если после успешной регистрации пользователь не войдет в систему в течение трех дней, ему будет отправлено текстовое сообщение.
(5) Пользователь инициирует возврат, и если он не будет обработан в течение трех дней, соответствующие операторы будут уведомлены.
(6) После того, как собрание назначено, каждый участник должен быть уведомлен за десять минут до назначенного времени для участия в собрании.
выполнить
В RabbitMQ нет очереди с задержкой, и нет атрибута для установки.Очередь с задержкой может быть реализована только с помощью комбинации обмена недоставленными письмами (DLX) и установки времени истечения срока действия (TTL).
1.TTL
TTL — это сокращение от Time To Live, что означает «время жить».
RabbitMq поддерживает настройку TTL для сообщений и очередей.Для сообщений этот параметр задается во время отправки.Для параметров очереди он рассчитывается с момента помещения сообщения в очередь.Пока настроен тайм-аут очереди превышено, сообщение будет автоматически удалено.
Если эти два метода используются вместе, превалирует меньшее значение между TTL пары сообщений и TTL очереди, т. е. сообщение истекает через 5 с, а очередь составляет 10 с, тогда 5 с вступают в силу.
По умолчанию срок действия не указан, что означает, что сообщение не имеет срока действия; если установлено значение 0, это означает, что сообщение потребляется непосредственно при доставке потребителю, в противном случае оно отбрасывается.
Время истечения срока действия сообщения, заданное параметром x-message-ttl в миллисекундах.
Установите время истечения срока действия очереди с параметрами X-EXPIRE, блок миллисекунды, примечание, нельзя установить на 0.
2. DLX и очереди недоставленных писем
DLX расшифровывается как Dead-Letter-Exchange, что на самом деле является обычным обменом, который можно привязать к любой очереди.
Очередь недоставленных сообщений означает, что после того, как сообщение (истекшее) в очереди (обычное) становится недоставленным письмом, его можно отправить на другой обмен (DLX), а затем перенаправить в очередь,
Эта очередь является очередью недоставленных сообщений
Как правило, следующие ситуации становятся мертвой буквой:
Сообщение было отклонено (basic.reject или basic.nack) с requeue=false
Срок жизни сообщения TTL истек.
Превышен предел длины очереди (очередь заполнена)
Примечание 1: Если в очереди есть недоставленное письмо, RabbitMq доставит сообщение о недоставленном письме в установленный DLX,
Примечание 2: объявите DLX, установив параметр x-dead-letter-exchange в очереди. Если текущий DLX является прямым типом, также объявите параметр x-dead-letter-routing-key, чтобы указать ключ маршрутизации. Если нет указанный, используйте Ключ маршрутизации исходной очереди
Функция очереди задержки моделируется через DLX и TTL, то есть: коммутатор связывает очередь a1, отправляет сообщение коммутатору a, сообщение будет сохранено в очереди a1, а сообщение в очереди a1 будет установлено. время истечения срока действия, подождите, пока Если время истечения срока действия не истекло, сообщение будет отправлено в обмен недоставленными сообщениями b, а b привязано к очереди недоставленных сообщений b1, нам нужно только использовать сообщения в b1.
1. Определите имена обмена и очереди
/**
* 接收延迟消息的队列
*/
public static final String TRADE_ORDER_DELAY_QUEUE = "trade-order-delay-queue";
/**
* DLX,dead letter发送到的 exchange
* 接收延迟消息的队列交换器
*/
public static final String TRADE_ORDER_DELAY_EXCHANGE = "trade-order-delay-exchange";
/**
* routing key 名称
* 具体消息发送在该 routingKey 的
*/
public static final String ORDER_DELAY_ROUTING_KEY = "order-delay";
/**
* 接收死信消息的queue - queue
*/
public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";
/**
* 接收死信消息的exchange - exchange
*/
public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";
/**
* routing key 名称
*/
public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter";
2. Класс TradeOrderQueueConfig инициализирует биржи и очереди и связывает их
/**
* 接收延迟信息的队列,并指定过期时间,以及过期之后要发送到哪个死信交换器,以及死信交换器的路由
*
* @return
*/
@Bean(name = "delayOrderQueue")
public Queue delayOrderQueue() {
Map<String, Object> params = new HashMap<>(2);
// x-dead-letter-exchange 声明了当前队列绑定的死信交换机
params.put("x-dead-letter-exchange", RabbitMqKey.DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
// x-message-ttl 队列过期时间
params.put("x-message-ttl", 100000);
return QueueBuilder.durable(RabbitMqKey.TRADE_ORDER_DELAY_QUEUE).withArguments(params).build();
}
/**
* 接收延迟信息的交换器
*
* @return
*/
@Bean(name = "orderDelayExchange")
public DirectExchange orderDelayExchange() {
return new DirectExchange(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE);
}
@Bean
Binding orderDelayBinding(@Qualifier("delayOrderQueue") Queue delayOrderQueue,
@Qualifier("orderDelayExchange") DirectExchange orderDelayExchange) {
return BindingBuilder.bind(delayOrderQueue).to(orderDelayExchange).with(RabbitMqKey.ORDER_DELAY_ROUTING_KEY);
}
/**
* 接收死信队列内的信息 - queue
* @return
*/
@Bean(name = "orderQueue")
public Queue orderQueue() {
return new Queue(RabbitMqKey.DEAD_LETTER_QUEUE, true);
}
/**
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
**/
@Bean(name = "orderTopicExchange")
public TopicExchange orderTopicExchange() {
return new TopicExchange(RabbitMqKey.DEAD_LETTER_EXCHANGE);
}
@Bean
Binding orderTopicBinding(@Qualifier("orderQueue") Queue orderQueue,
@Qualifier("orderTopicExchange") TopicExchange orderTopicExchange) {
return BindingBuilder.bind(orderQueue).to(orderTopicExchange).with(RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
}
3. Просмотр информации о привязке
4. Добавьте класс DelayListener для получения задержанных сообщений.
@Component
public class DelayListener {
private static final Logger logger = LoggerFactory.getLogger(DelayListener.class);
/**
* 接收延迟消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.DEAD_LETTER_QUEUE)
public void process(Message message) {
try {
String msg = new String(message.getBody());
if (StringUtils.isEmpty(msg)) {
logger.warn("接收的数据为空");
return;
}
logger.info("接收到的延迟消息:{}", msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
5. Добавьте методы и интерфейсы для отправки отложенных сообщений
Класс Sender добавляет метод отправки:
/**
* 发送延时队列信息
*
* @param payload
* @return
*/
public String delaySend(Object payload){
return baseSend(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE, RabbitMqKey.ORDER_DELAY_ROUTING_KEY, payload, null, null);
}
Класс ProducersController добавляет интерфейс
/**
* 发送信息
*/
@PostMapping("/send/delay/message")
public void sendDelayMessage(){
sender.delaySend("某某某订单已经失效,请归还库存");
}
6. Вызовите интерфейс и просмотрите результаты
Примечание. Время истечения срока действия здесь установлено на 100 секунд, что означает, что срок действия информации, которую вы отправляете в эту очередь, истекает через 100 секунд. метод, вы должны использовать плагин rabbitmq,Для получения дополнительной информации, пожалуйста, нажмите здесь.
Проблемы и решения, возникающие при использовании RabbitMq
Нажмите, чтобы просмотретьОбъяснение параметров конфигурации Spring Boot + RabbitMQ
механизм повтора сообщения
Если есть бизнес-логика потребителя сообщения, которая не может быть выполнена и нуждается в повторной отправке mq для повторного использования, вы можете использовать механизм повторных попыток rabbitmq, и если вы повторяете попытку, вам обычно нужно указать количество повторных попыток и интервал повторных попыток и т. д. ., в противном случае следующие сообщения. Если установлено количество повторений, обычно существуют следующие методы:
1. Используйте третье лицо, такое как Redis или Mongo, для хранения текущего количества повторных попыток.
2. Добавьте количество повторных попыток в заголовок и используйте метод channel.basicPublish() для повторной отправки сообщения и прибавьте 1 к количеству повторных попыток.
3. Используйте функцию повтора, которая поставляется с spring-rabbit.
Если вы установили сообщение для подтверждения вручную, то третий способ недействителен, channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); Этот метод является подтверждением отказа, третий параметр — Вы хотите поставить сообщение обратно в очередь после сбоя? Если вы настроите значение true, чтобы поместить сообщение обратно в очередь, сообщение всегда будет повторяться вместо количества повторных попыток в файле конфигурации, что будет блокировать следующее сообщение, поэтому если вы подтвердите это вручную, вам нужно использовать первый или второй метод, установить количество повторений самостоятельно и направить информацию в другую очередь для записи журнала после достижения количества повторений, а затем вручную подтвердить успех.
Первое:
1. Сначала проверьте, что сказано выше:
Настроенные параметры, такие как максимальное количество повторных попыток, максимальное время повторной попытки и интервал повторной попытки, бесполезны, и сообщение будет повторяться бесконечно.
2. При необходимости подтверждения вручную можно увидеть следующую картину Коммутатор привязан к очереди недоставленных сообщений После неудачного подтверждения данные сбрасываются в очередь недоставленных сообщений, а затем записывается лог и ручное вмешательство:
3. Если вы хотите подтвердить вручную и указать количество повторов, количество повторений будет помещено в кеш (последующие сообщения будут заблокированы, а это значит, что следующее сообщение будет после выполнения этих 5 повторов):
(1) Файл конфигурации изменен на ручное подтверждение
(2) Методы в принимающем классе изменены на ручное подтверждение, здесь закэширован ключ: корреляция — это уникальный идентификатор, который мы добавили при отправке сообщения
correlation:
(3) Проверьте результаты и войдите в очередь недоставленных сообщений более 5 раз (вы можете использовать данные в очереди недоставленных сообщений для записи журналов или запроса ручного вмешательства и т. д.)
4. Если вы не хотите блокировать сообщение, вы хотите поместить сообщение с ошибкой обратно в очередь и следовать принципу «первым пришел – первым обслужен», обратитесь ко второму методу.
Секунда:
пожалуйста, нажмите здесь, общая идея такова (то есть занести количество повторов в заголовочный файл rabbitmq, при выполнении в следующий раз получить информацию из заголовочного файла, а потом судить, превышено ли максимальное количество повторений, и тогда разберитесь с этим соответственно), может ли код я не пробовал его запускать.
Третий:
1. Добавьте имена обменов и очередей
/**
* 测试-队列
*/
public static final String TEST_QUEUE = "test-queue";
/**
* 测试-交换器
*/
public static final String TEST_EXCHANGE = "test-exchange";
/**
* 最大重试次数之后接收消息死信队列
*/
public static final String DELAY_QUEUE = "delay-queue";
/**
* 最大重试次数之后接收消息的交换器
*/
public static final String DELAY_EXCHANGE = "delay-exchange";
/**
* routing key 名称
*/
public static final String DELAY_ROUTING_KEY = "delay-routing-key";
2. Обязательные отношения
@Bean(name = "testExchange")
public FanoutExchange testExchange() {
logger.info("exchange : {}", RabbitMqKey.TEST_EXCHANGE);
return new FanoutExchange(RabbitMqKey.TEST_EXCHANGE);
}
@Bean(name = "delayTestQueue")
public Queue delayTestQueue() {
logger.info("queue : {}", RabbitMqKey.TEST_QUEUE);
// 队列持久化
return new Queue(RabbitMqKey.TEST_QUEUE, true);
}
@Bean
Binding delayTestBinding(@Qualifier("delayTestQueue") Queue delayTestQueue,
@Qualifier("testExchange") FanoutExchange testExchange) {
return BindingBuilder.bind(delayTestQueue).to(testExchange);
}
@Bean(name = "delayQueue")
public Queue delayQueue() {
logger.info("queue : {}", RabbitMqKey.DELAY_QUEUE);
// 队列持久化
return new Queue(RabbitMqKey.DELAY_QUEUE, true);
}
@Bean(name = "delayExchange")
public DirectExchange delayExchange() {
logger.info("exchange : {}", RabbitMqKey.DELAY_EXCHANGE);
return new DirectExchange(RabbitMqKey.DELAY_EXCHANGE);
}
@Bean
Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitMqKey.DELAY_ROUTING_KEY);
}
3. Измените файл конфигурации
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
# 是否启用【发布确认】
publisher-confirms: true
# 指定一个请求能处理多少个消息
listener:
simple:
# 限流(海量数据,同时只能过来一条)
prefetch: 1
# 自动签收auto 手动 manual 默认自动签收
acknowledge-mode: auto
retry:
# 开启重试机制
enabled: true
# 重试次数
max-attempts: 5
# 最大间隔时间
max-interval: 20000
# 重试间隔时间(单位毫秒)
initial-interval: 3000
#乘子 重试间隔*乘子得出下次重试间隔 3s 6s 12s 24s 此处24s>20s 走20s
multiplier: 2
# 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
default-requeue-rejected: false
4. В класс TestListener добавлен метод обработки информации об очереди.
@Component
public class TestListener {
private static final Logger logger = LoggerFactory.getLogger(TestListener.class);
/**
* 接收消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.TEST_QUEUE)
public void process(Message message) throws UnsupportedEncodingException {
String msg = new String(message.getBody());
if (StringUtils.isBlank(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
//出现异常
int a = 0;
int b = 1 / a;
}
/**
* 接收超过最大重试次数的消息
*
* @param message
*/
@RabbitListener(queues = RabbitMqKey.DELAY_QUEUE)
public void process1(Message message){
try {
String msg = new String(message.getBody());
if (StringUtils.isBlank(msg)) {
logger.warn("接收的数据为空");
return;
}
System.out.println("接收到的死信消息:" + msg);
} catch (Exception e) {
logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
}
}
}
5. Класс Sender добавляет метод для отправки сообщения
/**
* 测试
*
* @param payload
* @return
*/
public String testSendExchange(Object payload){
return baseSend(RabbitMqKey.TEST_EXCHANGE, "", payload, null, null);
}
6. ProducersController добавляет тестовый интерфейс
/**
* 发送信息
*/
@PostMapping("/send/test")
public void sendTest(){
sender.testSendExchange("测试消息重试机制");
}
7. Самый важный момент, класс RabbitAck увеличивает обмен информацией при пересылке, превышающей максимальное количество повторных попыток.
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, RabbitMqKey.DELAY_EXCHANGE, RabbitMqKey.DELAY_ROUTING_KEY);
}
Если вы не настроите MessageRecoverer вручную, RejectAndDontRequeueRecoverer будет использоваться по умолчанию, а реализация предназначена только для печати исключений.Исходный код выглядит следующим образом:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
}
}
8. Протестируйте и посмотрите результаты
(1) MessageRecoverer настроен:
(2) Messagerecoverer не настроен:
Если нет необходимости обеспечивать особую стабильность доверия к сообщению, данные вообще не могут быть потеряны. Тогда нет необходимости настраивать механизм автоматического повтора, ведь сетевые колебания случаются редко, и возникают проблемы с выполнением бизнес-логики (такие как нулевые указатели, выход за пределы массива и т. д.). 100 раз будет сообщено об исключении.Хорошо просто вытащить программатор в жертву небу.
Повторное потребление
украсть один здесьАо Бин (очень хороший автор)Картинка, чтобы объяснить вам
Пользователи в рамках единого интегрированного потребителя услуг не могут подать заявку на повторную пробную версию, а системы действий, потребительские купоны и другие услуги были успешными, повторите попытку, запрос отправляется еще раз, чтобы сообщить, что потребитель был успешным много раз, мы собираемся на обслуживание Расход? Это неприемлемо, начиная от старого, чтобы быть проклятым, в то время как пакет постельных принадлежностей, чтобы вернуться домой, были более важным боссом, убитым на месте, то как мы можем сделать, чтобы избежать дублирования тратить сообщение это?
Вообще говоря, способ решения такого рода проблем называется идемпотентностью интерфейса.
幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。 例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.
Популярное значение: когда вы получаете MQ, проверьте, потребляли ли вы этот mq. Если вы его потребляете, вы его не потребляете. Если вы его не потребляете, вы его потребляете. можно использовать номер заказа.+ Бизнес-сценарий судить, если есть этот номер заказа и бизнес-сценарий, он не будет потребляться, а если он не существует, он будет потребляться (мой текущий проект заключается в получении mq, проверьте, есть ли номер заказа, переданный mq, существует, если он существует, он будет печатать ненормальный журнал, потреблять, если он не существует)
сообщение потеряно
Есть четыре способа решить проблему потери сообщений:
1. Сохранение сообщений (посмотрите на приведенный выше код, и вы узнаете, как сохраняются очереди и обмены)
2. Механизм подтверждения ACK (также упомянутый выше)
3. Установите режим зеркалирования кластера
4. Механизм компенсации сообщений
конкретные потребности в контентенажмите здесь, чтобы посмотреть
Конкретное решение зависит от конкретной ситуации компании.Наша компания такая.Например, после размещения заказа нам нужно вычесть плату за обработку.Мы сначала записываем плату за обработку после успешного получения заказа, а затем переводим ее равномерно в ночное время. , Перевод не в реальном времени в основном учитывает десятичную точку суммы, потому что ее нужно разделить на несколько счетов в соответствии с определенной пропорцией, один перевод лучше, чем много раз, поэтому мы проверим порядок табличные данные при передаче суммы в задачах на время Для всех заказов в день, посмотрите, если заказ успешен, но информация о mq потеряна.Если есть, то плата за обработку будет автоматически добавлена.Конечно, это только обработка нашей компании метод, который может быть незрелым.Конкретная обработка потери информации зависит от вашей компании.Ситуация должна быть определена!
последовательное потребление
Последовательное потребление здесь относится к последовательному потреблению потребителей.Очередь rabbitmq хранит только последовательные сообщения издателя, но потребляют ли потребители последовательно, это другой вопрос.
Как показано выше, результаты двух совершенно разные?
решение:
Очередь соответствует потребителю (потребителю), отправляет все сообщения, которые должны быть гарантированы для очереди, включает ручное подтверждение, устанавливает prefetchCount = 1, потребляет только одно сообщение за раз, выполняет ручное подтверждение после обработки, а затем получает следующее сообщение, только что обработанное Потребителем
Позвольте мне рассказать об этом здесь, если есть еще несколько потребителей, использующих синхронную обработку, ручное подтверждение неприемлемо, каждый потребитель получит сообщение в первый раз (если количество сообщений > количество потребителей), а остальные сообщения будут отправлены после подтверждения, так что гарантии последовательного потребления по-прежнему нет, как показано ниже.
В соответствии с моей идеей, очередь изначально является «первым пришел — первым вышел». Поставщик А отправляет в очередь три сообщения о добавлении, изменении и удалении, поэтому то, что получает потребитель Б, также должно быть добавлением, изменением и удалением. является ручным механизмом подтверждения.После получения добавления B вручную подтвердит успешное выполнение, получит модификацию, а затем получит сообщение об удалении, что также гарантирует заказ (я не знаю, есть ли какие-либо проблемы с моей теорией, если вы Если вы знаете какие-либо проблемы, пожалуйста, прокомментируйте и скажите мне, я изменю его, у меня также есть Baidu, но в основном та же идея)
Вот статья о последовательной потреблении, вы можете прочитать его, если вам это нужно!
Исходный код этой статьи (Сервис 1, он может быть немного сумбурным, если у вас есть какие-либо вопросы, вы можете прокомментировать, я отвечу вовремя, когда увижу это)это здесь, этот проект также был создан несколько дней назад. Вам нужно только установить RabbitMQ локально, чтобы запустить его. В будущем компоненты, связанные с springcloud, и некоторые методы использования промежуточного программного обеспечения будут постоянно добавляться!
Если вам это нужно, вы можете обратить внимание на мой официальный аккаунт, и я буду обновлять технические статьи, связанные с Java в режиме реального времени.Также в официальном аккаунте есть некоторые практические материалы, такие как видеоурок системы Java seckill, обучение материалы темной лошадки 2019 (версия IDEA) и резюме вопросов интервью BAT (полная классификация), компьютеры MAC, обычно используемые установочные пакеты (некоторые из них покупаются на Taobao, уже PJ).
Беги и беги, увидимся в следующей статье, если вам не понравилась детская обувь, не забудьте поставить лайк!