[Серия MQ] Сообщение SprigBoot + RabbitMq, отправляющее базовую позу
Первые два сообщения в блоге представили основные точки знаний RabbitMq и демонстрационное приложение, которое интегрирует SpringBoot, Теперь давайте перейдем к сути и посмотрим, как играть в rabbitmq в среде SpringBoot.
Содержание этой статьи в основном предназначено для отправки сообщений, включая следующие пункты.
-
RabbitTemplate
Основные жесты для отправки сообщений - Основные свойства пользовательского сообщения
- пользовательский конвертер сообщений
AbstractMessageConverter
- Случай невозможности отправки сообщения типа Object
I. Базовая поза использования
1. Конфигурация
мы используемSpringBoot 2.2.1.RELEASE
+ rabbitmq 3.7.5
Приходите для завершения строительства и тестирования проекта
Проект pom.xml выглядит следующим образом
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
конфигурационный файлapplication.yml
Содержание выглядит следующим образом
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1
2. Класс конфигурации
Благодаря предыдущим знаниям rabbitmq мы можем узнать основную логику отправителя «отправить сообщение на биржу, а затем распределить его в соответствующую очередь в соответствии с различными стратегиями».
В этом сообщении в блоге в основном обсуждается отправка сообщений.Для последующего примера демонстрации мы определяем обмен в режиме темы и привязываем очередь (поскольку для отправителя разные типы обмена влияют на состояние использования отправителя. Это не так уж важно, это потребители, которые воздействие)
public class MqConstants {
public static final String exchange = "topic.e";
public static final String routing = "r";
public final static String queue = "topic.a";
}
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.exchange);
}
@Bean
public Queue queue() {
// 创建一个持久化的队列
return new Queue(MqConstants.queue, true);
}
@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
3. Отправка сообщения
Отправка сообщений, в основном с помощьюRabbitTemplate#convertAndSend
метод достижения, как правило, мы можем использовать его напрямую
@Service
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
private String publish2mq1(String ans) {
String msg = "Durable msg = " + ans;
System.out.println("publish: " + msg);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg;
}
}
Основная точка выше - всего одна строкаrabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
- Указывает, что сообщение отправлено на указанный обмен и установлен ключ маршрутизации сообщения.
осторожность
С помощью вышеуказанного метода отправленное сообщение по умолчанию является постоянным.Когда постоянное сообщение распространяется в постоянную очередь, будет выполняться операция удаления сообщения;
В некоторых сценариях мы не так строго относимся к целостности сообщения, но больше беспокоимся о производительности mq, и допустима потеря некоторых данных; в это время нам может понадобиться настроить свойства отправленного сообщения (например, установите сообщение как постоянное)
Ниже приведены две позы, вторая рекомендуется
/**
* 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
*
* @param ans
* @return
*/
private String publish2mq2(String ans) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);
System.out.println("publish: " + message);
return message.toString();
}
private String publish2mq3(String ans) {
String msg = "Define msg = " + ans;
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("ta", "测试");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});
return msg;
}
Уведомление
- В реальной разработке проекта рекомендуется использовать
MessagePostProcessor
настроить свойства сообщения - Во-вторых, не рекомендуется создавать сообщение каждый раз, когда вы отправляете сообщение
MessagePostProcessor
Объект, пожалуйста, определите общий объект, повторно используйте его, если вы можете использовать его повторно
4. Несериализованный объект отправляет исключение
просмотревrabbitTemplate#convertAndSend
Из определения интерфейса мы знаем, что отправляемое сообщение может иметь тип Object, значит ли это, что любой объект может быть передан в mq?
Ниже тестовый пример
private String publish2mq4(String ans) {
NonSerDO nonSerDO = new NonSerDO(18, ans);
System.out.println("publish: " + nonSerDO);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
@Data
public static class NonSerDO {
private Integer age;
private String name;
public NonSerDO(int age, String name) {
this.age = age;
this.name = name;
}
}
когда мы вызываем вышеpublish2mq4
Когда метод используется, он не будет напрямую успешным, как предполагалось, а вместо этого выдает исключение типа параметра.
Почему возникает эта проблема? Из анализа стека мы знаем, что RabbitTemplate использует шаблон по умолчанию.SimpleMessageConverter
Для реализации логики сообщения инкапсуляции основной код
// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
Вышеприведенная логика ясно утверждает, чтоПринимает только байтовые массивы, строковые строки и сериализуемые объекты (здесь используется метод сериализации jdk для реализации взаимного преобразования между объектами и байтовыми массивами).
- Итак, мы передаем несериализованный объект, и параметр является недопустимым исключением
Естественно, нам интересно, есть ли другиеMessageConverter
для дружеской поддержки любого типа объекта
5. Пользовательский конвертер сообщений
Затем мы надеемся решить вышеуказанную проблему, настроив MessageConverter с сериализацией json.
Более простая реализация (с использованием FastJson для сериализации/десериализации)
public static class SelfConverter extends AbstractMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
}
переопределить одинrabbitTemplate
, и установите его преобразователь сообщений на пользовательскийSelfConverter
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new SelfConverter());
return rabbitTemplate;
}
затем проверьте это снова
@Service
public class JsonPublisher {
@Autowired
private RabbitTemplate jsonRabbitTemplate;
private String publish1(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "json");
msg.put("version", 123);
System.out.println("publish: " + msg);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}
private String publish2(String ans) {
BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
System.out.println("publish: " + nonSerDO);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
}
Push-сообщения, полученные в mq, следующие:
6. Jackson2JsonMessageConverter
Хотя преобразование сообщений в формате Json реализовано выше, оно относительно простое, и такая базовая и общая функция, согласно выдержанному стилю корзины семейства Spring, должна быть легко доступна, да, этоJackson2JsonMessageConverter
Таким образом, наша поза также может быть следующей
//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "jackson");
msg.put("version", 456);
System.out.println("publish: " + msg);
jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}
Ниже приведено содержание сериализованного сообщения через Джексона, которое немного отличается от того, что мы настроили.headers
а такжеcontent_encoding
7. Резюме
Основные точки знаний этого сообщения в блоге следующие:
- пройти через
RabbitTemplate#convertAndSend
для рассылки сообщений - пройти через
MessagePostProcessor
Чтобы настроить свойства сообщения (обратите внимание, что сообщение, доставляемое по умолчанию, является постоянным) - Класс оболочки сообщения по умолчанию:
SimpleMessageConverter
, поддерживает только распределение байтовых массивов, строк и сериализуемых объектов; вызовы методов, которые не соответствуют трем указанным выше условиям, вызовут исключение - Мы можем достичь путем
MessageConverter
интерфейс для определения собственного класса инкапсуляции сообщений для решения вышеуказанных проблем
В сообщении блога точки знаний RabbitMq четко упоминается, что для обеспечения того, чтобы сообщение было правильно получено брокером, предусмотрены два случая механизма подтверждения сообщения и механизма транзакции.Если эти два метода необходимо использовать, что следует производитель сообщений делать?
Из-за нехватки места в следующем сообщении в блоге будет представлена ситуация использования отправки сообщений в рамках механизма подтверждения сообщения/механизма транзакции.
II. Другое
0. Серия постов в блоге и исходный код проекта
Серия блогов
- [Серия MQ] первый опыт использования springboot + rabbitmq
- [Серия MQ] Сводка основных знаний RabbitMq
Исходный код проекта
- проект:GitHub.com/JuneB/tickets…
- Исходный код:GitHub.com/JuneB/tickets…
1. Блог одного пепла
Это не так хорошо, как письмо.Вышеупомянутое содержание чисто из семьи.Из-за ограниченных личных способностей неизбежно есть упущения и ошибки.Если вы найдете ошибки или у вас есть лучшие предложения, вы можете критиковать и исправлять их.
Ниже представлен серый личный блог, в котором записываются все посты в блоге по учебе и работе, приглашаю всех посетить
- Блог One Ash Личный блогblog.hhui.top
- Блог One Ash - специальный весенний блогspring.hhui.top