Почему выбирают MQTT
Считается, что определение MQTT понятно многим, в этой статье не обсуждаются какие-то высокоуровневые вещи, она нацелена на использование самого простого и интуитивно понятного способа, чтобы каждый новичок мог применить его как можно быстрее.
Начнем с анализа того, что необходимо для использования MQTT:
- сервер сообщений
- Частые взаимодействия между различными приложениями/устройствами
- Может включать обмен сообщениями «один ко многим»
По трем пунктам, перечисленным выше, мы, вероятно, можем понять, что наиболее подходящий сценарий для MQTT — этоИнформацияЯвляясь важной частью системы и участвуя в ключевой бизнес-логике системы
MQTT, старт!
Теперь, когда мы решили его использовать, первое, что нам нужно изучить, это как заставить MQTT работать правильно, ведь это не просто вопрос добавления зависимости к maven.
Всего нам нужно сделать две вещи:
- скачатьEMQXсервер сообщений, как брокер
- Ввести зависимости в maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
Выполнив два вышеуказанных шага, запустите сервер EMQX и официально приступайте к нашему путешествию по MQTT.
Как пользоваться
В коде использования MQTT в Spring Boot автор резюмирует следующие два способа:
- Концепция канала сообщений с использованием spring-integration
- Используйте традиционную концепцию «Клиент-клиент»
Первый создаст определенную степень умственной нагрузки, но после того, как автор успешно сопоставит (плагиат + изготовление колеса) автоматическую регистрацию, он намного удобнее второго
Прежде чем представить конкретный код, давайте кратко разберем наиболее распространенные используемые понятия:
- Тема: Основной способ распространения сообщений MQTT, мы публикуем сообщения в темы, подписываемся на темы, читаем сообщения из тем и выполняем обработку бизнес-логики, темы - это каналы сообщений
- Производители: отправители сообщений MQTT, они отправляют сообщения в темы
- Потребители: получатели сообщений MQTT, они подписываются на нужные им темы и получают от них сообщения.
- брокер: пересылка сообщений, через него передаются сообщения, EMQX — наш брокер, нам не нужно заботиться о его конкретной реализации при использовании
Фактически процесс использования MQTT таков: производитель отправляет сообщение в топик -> брокер доставляет сообщение -> потребитель, который подписывается на топик, получает сообщение и выполняет соответствующую бизнес-логику
Режим клиента
Этот режим в основном такой же, как традиционная ссылка на базу данных и ссылка Redis, которой могут легко управлять небольшие партнеры с опытом разработки.Нам нужно рассмотреть, создавать ли соответствующую фабрику, является ли это одноэлементным режимом, прототипом, или новый. А бассейн?
Мы используем шаблон singleton для этого введения
Создайте фабричный класс
Во-первых, мы создаем фабрику (не распознавайте отравление шаблонами проектирования)
public class MqttFactory {
private static MqttProperties configuration;
private static MqttClient client;
/**
* 获取客户端实例
* 单例模式, 存在则返回, 不存在则初始化
*/
public static MqttClient getInstance() {
if (client == null) {
init();
}
return client;
}
/**
* 初始化客户端
*/
public static void init() {
try {
client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());
// MQTT配置对象
MqttConnectOptions options = new MqttConnectOptions();
// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
options.setAutomaticReconnect(true);
if (!client.isConnected()) {
client.connect(options);
}
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 连接消息服务器[%s]失败", configuration.getAddress()));
}
}
}
Конкретную конфигурацию MQTT можно посмотретьMqttConnectOptions, здесь не поясняется
Еще одно слово, документация всегда лучше некоторых блогов!!!
Создать класс инструмента
Далее создаем MqttUtil для отправки сообщений и подписки на топик
public class MqttUtil {
/**
* 发送消息
* @param topic 主题
* @param data 消息内容
*/
public static void send(String topic, Object data) {
// 获取客户端实例
MqttClient client = MqttFactory.getInstance();
ObjectMapper mapper = new ObjectMapper();
try {
// 转换消息为json字符串
String json = mapper.writeValueAsString(data);
client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));
} catch (JsonProcessingException e) {
LOGGER.error(String.format("MQTT: 主题[%s]发送消息转换json失败", topic));
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 主题[%s]发送消息失败", topic));
}
}
/**
* 订阅主题
* @param topic 主题
* @param listener 消息监听处理器
*/
public static void subscribe(String topic, IMqttMessageListener listener) {
MqttClient client = MqttFactory.getInstance();
try {
client.subscribe(topic, listener);
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 订阅主题[%s]失败", topic));
}
}
}
Я полагаю, что вы заметили вещь IMqttMessageListener. Нам нужно только создать класс прослушивателя и реализовать интерфейс IMqttMessageListener для обработки сообщений. Код выглядит следующим образом:
public class MessageListener implements IMqttMessageListener {
/**
* 处理消息
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
LOGGER.info(String.format("MQTT: 订阅主题[%s]发来消息[%s]", topic, new String(mqttMessage.getPayload())));
}
public static void main(String[] args) {
//订阅主题test01, 使用MessageListener来处理它的消息
MqttUtil.subscribe("test01", new MessageListener());
}
}
Легко ли понять, отправка или подписка?
После того, как комфортные дела закончились, это приносит бесконечную пытку и пустоту, давай, бросим вызов второму режиму тяжелой умственной нагрузки!
Spring Integration
Что такое Spring Integration?Извините, я не знаю и не хочу знать
Зачем использовать Spring Integration, потому что он действительно хорошо поддерживается
Большинство руководств в Интернете посвящены интеграции Spring. Это может быть мой первый контакт. Я не могу понять то же самое, поэтому я решил отказаться от них и выбралВеликий БогМетод автоматической настройки
Помните концепции, которые мы обсуждали ранее: Субъект/Производитель/Потребитель?
В Spring Integration мы добавляем несколько новых концепций и дорабатываем предыдущие:
- Канал: Канал, через который сообщения передаются и принимаются, через который каждое сообщение детализируется и выходит.
- Фабрика клиентов: используется для создания клиентов MQTT, аналогичных тому, что используется в режиме 1.
- Адаптер сообщений: используется для получения сообщений MQTT, их преобразования, но не участия в бизнес-логике.
- Входящий канал: с адаптером сообщений, сообщениевойти на платформуканал
- Исходящий канал: с клиентской фабрикой, сообщениеотправить платформуканал
- тема: все еще тема, она не меняется
- Продюсер: Парень с исходящим каналом
- Потребитель: парень с входящим каналом
Если вы сможете постепенно понять приведенное выше определение, процесс этого режима на самом деле может стать таким:
- Производитель: создать исходящий канал, указав клиентскую фабрику -> отправить сообщение
- Потребитель: создать входящий канал указанного адаптера сообщений -> получать сообщения -> вводить перехватчики сообщений -> бизнес-логика
На самом деле, на мой взгляд, это соответствует концепции Spring Boot, и соглашение лучше, чем конфигурация.
Код содержимого этого блока более сложен и будет подробно объяснен позже.
Конкретный код может относиться к лицу, используемому для практики.Spring Boot Koala
Суммировать
Как служба сообщений, MQTT может удовлетворить большинство наших потребностей в разработке, но все еще остаются некоторые остающиеся проблемы, которые автор не продумывал и не практиковал глубоко:
- Как использовать механизм qos, чтобы гарантировать, что данные не будут потеряны
- Очередь и порядок сообщений
- Приложение в кластерном режиме