[Серия SpringBoot MQ] Основное использование отправки сообщений RabbitMq

Spring Boot

[Серия MQ] Сообщение SprigBoot + RabbitMq, отправляющее базовую позу

Первые два сообщения в блоге представили основные точки знаний RabbitMq и демонстрационное приложение, которое интегрирует SpringBoot, Теперь давайте перейдем к сути и посмотрим, как играть в rabbitmq в среде SpringBoot.

Содержание этой статьи в основном предназначено для отправки сообщений, включая следующие пункты.

  • RabbitTemplateОсновные жесты для отправки сообщений
  • Основные свойства пользовательского сообщения
  • пользовательский конвертер сообщенийAbstractMessageConverter
  • Случай невозможности отправки сообщения типа Object

I. Базовая поза использования

1. Конфигурация

мы используемSpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5Приходите для завершения строительства и тестирования проекта

Проект pom.xml выглядит следующим образом

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

конфигурационный файлapplication.ymlСодержание выглядит следующим образом

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1

2. Класс конфигурации

Благодаря предыдущим знаниям rabbitmq мы можем узнать основную логику отправителя «отправить сообщение на биржу, а затем распределить его в соответствующую очередь в соответствии с различными стратегиями».

В этом сообщении в блоге в основном обсуждается отправка сообщений.Для последующего примера демонстрации мы определяем обмен в режиме темы и привязываем очередь (поскольку для отправителя разные типы обмена влияют на состояние использования отправителя. Это не так уж важно, это потребители, которые воздействие)

public class MqConstants {

    public static final String exchange = "topic.e";

    public static final String routing = "r";

    public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.exchange);
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列
        return new Queue(MqConstants.queue, true);
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

3. Отправка сообщения

Отправка сообщений, в основном с помощьюRabbitTemplate#convertAndSendметод достижения, как правило, мы можем использовать его напрямую

@Service
public class BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg;
    }
}

Основная точка выше - всего одна строкаrabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);

  • Указывает, что сообщение отправлено на указанный обмен и установлен ключ маршрутизации сообщения.

осторожность

С помощью вышеуказанного метода отправленное сообщение по умолчанию является постоянным.Когда постоянное сообщение распространяется в постоянную очередь, будет выполняться операция удаления сообщения;

В некоторых сценариях мы не так строго относимся к целостности сообщения, но больше беспокоимся о производительности mq, и допустима потеря некоторых данных; в это время нам может понадобиться настроить свойства отправленного сообщения (например, установите сообщение как постоянное)

Ниже приведены две позы, вторая рекомендуется

/**
 * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
 *
 * @param ans
 * @return
 */
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

    System.out.println("publish: " + message);
    return message.toString();
}


private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta", "测试");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });

    return msg;
}

Уведомление

  • В реальной разработке проекта рекомендуется использоватьMessagePostProcessorнастроить свойства сообщения
  • Во-вторых, не рекомендуется создавать сообщение каждый раз, когда вы отправляете сообщениеMessagePostProcessorОбъект, пожалуйста, определите общий объект, повторно используйте его, если вы можете использовать его повторно

4. Несериализованный объект отправляет исключение

просмотревrabbitTemplate#convertAndSendИз определения интерфейса мы знаем, что отправляемое сообщение может иметь тип Object, значит ли это, что любой объект может быть передан в mq?

Ниже тестовый пример

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}


@Data
public static class NonSerDO {
    private Integer age;
    private String name;

    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name;
    }
}

когда мы вызываем вышеpublish2mq4Когда метод используется, он не будет напрямую успешным, как предполагалось, а вместо этого выдает исключение типа параметра.

Почему возникает эта проблема? Из анализа стека мы знаем, что RabbitTemplate использует шаблон по умолчанию.SimpleMessageConverterДля реализации логики сообщения инкапсуляции основной код

// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
	byte[] bytes = null;
	if (object instanceof byte[]) {
		bytes = (byte[]) object;
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
	}
	else if (object instanceof String) {
		try {
			bytes = ((String) object).getBytes(this.defaultCharset);
		}
		catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"failed to convert to Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setContentEncoding(this.defaultCharset);
	}
	else if (object instanceof Serializable) {
		try {
			bytes = SerializationUtils.serialize(object);
		}
		catch (IllegalArgumentException e) {
			throw new MessageConversionException(
					"failed to convert to serialized Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
	}
	if (bytes != null) {
		messageProperties.setContentLength(bytes.length);
		return new Message(bytes, messageProperties);
	}
	throw new IllegalArgumentException(getClass().getSimpleName()
			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

Вышеприведенная логика ясно утверждает, чтоПринимает только байтовые массивы, строковые строки и сериализуемые объекты (здесь используется метод сериализации jdk для реализации взаимного преобразования между объектами и байтовыми массивами).

  • Итак, мы передаем несериализованный объект, и параметр является недопустимым исключением

Естественно, нам интересно, есть ли другиеMessageConverterдля дружеской поддержки любого типа объекта

5. Пользовательский конвертер сообщений

Затем мы надеемся решить вышеуказанную проблему, настроив MessageConverter с сериализацией json.

Более простая реализация (с использованием FastJson для сериализации/десериализации)

public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}

переопределить одинrabbitTemplate, и установите его преобразователь сообщений на пользовательскийSelfConverter

@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}

затем проверьте это снова

@Service
public class JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;
      
    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type", "json");
        msg.put("version", 123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }

    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        return nonSerDO.toString();
    }
}

Push-сообщения, полученные в mq, следующие:

6. Jackson2JsonMessageConverter

Хотя преобразование сообщений в формате Json реализовано выше, оно относительно простое, и такая базовая и общая функция, согласно выдержанному стилю корзины семейства Spring, должна быть легко доступна, да, этоJackson2JsonMessageConverter

Таким образом, наша поза также может быть следующей

//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}


// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type", "jackson");
    msg.put("version", 456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}

Ниже приведено содержание сериализованного сообщения через Джексона, которое немного отличается от того, что мы настроили.headersа такжеcontent_encoding

7. Резюме

Основные точки знаний этого сообщения в блоге следующие:

  • пройти черезRabbitTemplate#convertAndSendдля рассылки сообщений
  • пройти черезMessagePostProcessorЧтобы настроить свойства сообщения (обратите внимание, что сообщение, доставляемое по умолчанию, является постоянным)
  • Класс оболочки сообщения по умолчанию:SimpleMessageConverter, поддерживает только распределение байтовых массивов, строк и сериализуемых объектов; вызовы методов, которые не соответствуют трем указанным выше условиям, вызовут исключение
  • Мы можем достичь путемMessageConverterинтерфейс для определения собственного класса инкапсуляции сообщений для решения вышеуказанных проблем

В сообщении блога точки знаний RabbitMq четко упоминается, что для обеспечения того, чтобы сообщение было правильно получено брокером, предусмотрены два случая механизма подтверждения сообщения и механизма транзакции.Если эти два метода необходимо использовать, что следует производитель сообщений делать?

Из-за нехватки места в следующем сообщении в блоге будет представлена ​​ситуация использования отправки сообщений в рамках механизма подтверждения сообщения/механизма транзакции.

II. Другое

0. Серия постов в блоге и исходный код проекта

Серия блогов

Исходный код проекта

1. Блог одного пепла

Это не так хорошо, как письмо.Вышеупомянутое содержание чисто из семьи.Из-за ограниченных личных способностей неизбежно есть упущения и ошибки.Если вы найдете ошибки или у вас есть лучшие предложения, вы можете критиковать и исправлять их.

Ниже представлен серый личный блог, в котором записываются все посты в блоге по учебе и работе, приглашаю всех посетить