Несколько поз SpringBoot+RabbitMq

RabbitMQ
Несколько поз SpringBoot+RabbitMq

предисловие

В настоящее время основное промежуточное ПО для сообщений включает в себя activemq, rabbitmq, RocketMQ и kafka. Нам необходимо выбрать подходящее промежуточное ПО для сообщений в соответствии с реальным бизнес-сценарием. Основными показателями, вызывающими обеспокоенность, являются надежность доставки сообщений, ремонтопригодность и пропускная способность. характеристики промежуточного программного обеспечения и другие важные показатели для выбора, поле больших данных должно быть kafka, тогда традиционный бизнес-сценарий — это развязка, асинхронность и пиковое бритье. Затем выберите один из оставшихся продуктов 3. С точки зрения пропускной способности, активности сообщества и надежности сообщений малым и средним компаниям может быть более подходящим выбор rabbitmq. Итак, давайте посмотрим, как его использовать.

Подготовка окружающей среды

Этот кейс основан на встроенном в springboot rabbitmq. Этот кейс в основном посвящен важному практическому коду. Для базовых теоретических знаний, пожалуйста, Baidu.
jdk-версия: 1.8
RabbitMQ-версия: 3.7
springboot-версия: 2.1.4.RELEASE

  • файл pom
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • yml-файл конфигурации
spring:
  rabbitmq:
    password: guest
    username: guest
    port: 5672
    addresses: 127.0.0.1
    #开启发送失败返回
    publisher-returns: true
    #开启发送确认
    publisher-confirms: true
    listener:
      simple:
        #指定最小的消费者数量.
        concurrency: 2
        #指定最大的消费者数量.
        max-concurrency: 2
        #开启ack
        acknowledge-mode: auto
      #开启ack
      direct:
        acknowledge-mode: auto
    #支持消息的确认与返回
    template:
      mandatory: true

Настройте позу rabbitMq

  • поза один

На основе javaconfig

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description rabbitMq配置类
 * @Author lly
 * @Date 2019-05-13 15:05
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {

    public final static String DIRECT_QUEUE = "directQueue";
    public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
    public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
    public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
    public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

    public final static String TOPIC_EXCHANGE = "topic_exchange";
    public final static String FANOUT_EXCHANGE = "fanout_exchange";

    public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
    public final static String TOPIC_ROUTINGKEY_TWO = "*.key";

//  direct模式队列
    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);
    }
//  topic 订阅者模式队列
    @Bean
    public Queue topicQueueOne() {
        return new Queue(TOPIC_QUEUE_ONE, true);
    }
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(TOPIC_QUEUE_TWO, true);
    }
//  fanout 广播者模式队列
    @Bean
    public Queue fanoutQueueOne() {
        return new Queue(FANOUT_QUEUE_ONE, true);
    }
    @Bean
    public Queue fanoutQueueTwo() {
        return new Queue(FANOUT_QUEUE_TWO, true);
    }
//  topic 交换器
    @Bean
    public TopicExchange topExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
//  fanout 交换器
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

//   订阅者模式绑定
    @Bean
    public Binding topExchangeBingingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
    }

    @Bean
    public Binding topicExchangeBingingTwo() {
        return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
    }
//   广播模式绑定
    @Bean
    public Binding fanoutExchangeBingingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutExchangeBingingTwo() {
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }
}

  • поза два

на основе аннотаций

package com.lly.order.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;


/**
 * @ClassName MQTest
 * @Description 消息队列测试
 * @Author lly
 * @Date 2019-05-13 10:50
 * @Version 1.0
 **/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private final static String QUEUE = "test_queue";

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public MQTest(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendMq() {
        rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
        log.info("发送消息:{}", "test_queue" + LocalTime.now());
    }


    public void sendMqRabbit() {
        //回调id
        CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
//        rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "广播者模式测试",cId);
        Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "广播者模式测试", cId);
        log.info("发送消息:{},object:{}", "广播者模式测试" + LocalTime.now(), object);
    }

    //发送订阅者模式
    public void sendMqExchange() {
        CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
        CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
        log.info("订阅者模式->发送消息:routing_key_one");
        rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
        log.info("订阅者模式->发送消息routing_key_two");
        rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
    }
    //如果不存在,自动创建队列
    @RabbitListener(queuesToDeclare = @Queue("test_queue"))
    public void receiverMq(String msg) {
        log.info("接收到队列消息:{}", msg);
    }
      //如果不存在,自动创建队列和交换器并且绑定
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
                    key = "routing_key_one")})
    public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            log.info("接收到topic_routing_key_one消息:{}", msg);
            //发生异常
            log.error("发生异常");
            int i = 1 / 0;
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("接收消息失败,重新放回队列");
            //requeu,为true,代表重新放入队列多次失败重新放回会导致队列堵塞或死循环问题,
            // 解决方案,剔除此消息,然后记录到db中去补偿
            //channel.basicNack(deliveryTag, false, true);
            //拒绝消息
            //channel.basicReject(deliveryTag, true);
        }
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
                    key = "routing_key_two")})
    public void receiverMqExchageTwo(String msg) {
        log.info("接收到topic_routing_key_two消息:{}", msg);
    }


    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
    public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("接收到队列fanout_queue_one消息:{}", msg);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            e.printStackTrace();
            //多次失败重新放回会导致队列堵塞或死循环问题 丢弃这条消息
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("接收消息失败");
        }
    }

    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
    public void receiverMqFanoutTwo(String msg) {
        log.info("接收到队列fanout_queue_two消息:{}", msg);
    }

    /**
     * @return
     * @Author lly
     * @Description 确认消息是否发送到exchange
     * @Date 2019-05-14 15:36
     * @Param [correlationData, ack, cause]
     **/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息唯一标识id:{}", correlationData);
        log.info("消息确认结果!");
        log.error("消息失败原因,cause:{}", cause);
    }
    /**
     * @return
     * @Author lly
     * @Description 消息消费发生异常时返回
     * @Date 2019-05-14 16:22
     * @Param [message, replyCode, replyText, exchange, routingKey]
     **/
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息发送失败id:{}", message.getMessageProperties().getCorrelationId());
        log.info("消息主体 message : ", message);
        log.info("消息主体 message : ", replyCode);
        log.info("描述:" + replyText);
        log.info("消息使用的交换器 exchange : ", exchange);
        log.info("消息使用的路由键 routing : ", routingKey);
    }
}

Три способа подтверждения сообщения rabbitMq

# 发送消息后直接确认消息
acknowledge-mode:none
# 根据消息消费的情况,智能判定消息的确认情况
acknowledge-mode:auto
# 手动确认消息的情况
acknowledge-mode:manual

Мы используем тематический режим для проверки подтверждения сообщения.

режим автоматического подтверждения сообщения
Ручной режим сообщения подтверждения

Затем мы снова используем сообщение и обнаруживаем, что сообщение не подтверждено, поэтому его можно использовать снова.

Обнаружено, что не удаляется та же очередь сообщений, которая все еще существует, вы должны вручную пройти ack, ack очередь вручную изменить наш взгляд на эффект 1

channel.basicAck(deliveryTag, false);

Перезапустите проект еще раз, чтобы использовать сообщение

Снова проверьте сообщения в очереди и обнаружите, что сообщения в очереди 01 были удалены, но сообщения в очереди 02 все еще существуют.

Если в сообщении потребления возникает исключение, измените код, чтобы имитировать то, что происходит в случае исключения. Произошло исключение, и сообщение было повторно поставлено в очередь.

Однако это приведет к тому, что сообщение будет использовано в непрерывном цикле, а затем произойдет сбой, что приведет к бесконечному циклу, вызывающему большое количество ресурсов сервера.
Следовательно, правильный способ справиться с этим - записать сообщение в БД при возникновении исключения, а затем использовать механизм компенсации для компенсации сообщения, или записать количество повторений сообщения, повторить попытку и поместить его в БД после более чем несколько раз.

Суммировать

Благодаря фактическому коду мы знаем rabbitmq, легко выбрать правильную программу для использования в конкретных ситуациях, интеграцию проекта, подтверждение сообщения в реальной сцене. Если недостаточно, но и надеяться на крыло.