Использование MQTT с Spring Boot

Java

Почему выбирают MQTT

Считается, что определение MQTT понятно многим, в этой статье не обсуждаются какие-то высокоуровневые вещи, она нацелена на использование самого простого и интуитивно понятного способа, чтобы каждый новичок мог применить его как можно быстрее.

Начнем с анализа того, что необходимо для использования MQTT:

  1. сервер сообщений
  2. Частые взаимодействия между различными приложениями/устройствами
  3. Может включать обмен сообщениями «один ко многим»

По трем пунктам, перечисленным выше, мы, вероятно, можем понять, что наиболее подходящий сценарий для MQTT — этоИнформацияЯвляясь важной частью системы и участвуя в ключевой бизнес-логике системы

MQTT, старт!

Теперь, когда мы решили его использовать, первое, что нам нужно изучить, это как заставить MQTT работать правильно, ведь это не просто вопрос добавления зависимости к maven.

Всего нам нужно сделать две вещи:

  1. скачатьEMQXсервер сообщений, как брокер
  2. Ввести зависимости в maven
<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-mqtt</artifactId>  
    <version>5.3.2.RELEASE</version>  
</dependency>

Выполнив два вышеуказанных шага, запустите сервер EMQX и официально приступайте к нашему путешествию по MQTT.

Как пользоваться

В коде использования MQTT в Spring Boot автор резюмирует следующие два способа:

  1. Концепция канала сообщений с использованием spring-integration
  2. Используйте традиционную концепцию «Клиент-клиент»

Первый создаст определенную степень умственной нагрузки, но после того, как автор успешно сопоставит (плагиат + изготовление колеса) автоматическую регистрацию, он намного удобнее второго

Прежде чем представить конкретный код, давайте кратко разберем наиболее распространенные используемые понятия:

  • Тема: Основной способ распространения сообщений MQTT, мы публикуем сообщения в темы, подписываемся на темы, читаем сообщения из тем и выполняем обработку бизнес-логики, темы - это каналы сообщений
  • Производители: отправители сообщений MQTT, они отправляют сообщения в темы
  • Потребители: получатели сообщений MQTT, они подписываются на нужные им темы и получают от них сообщения.
  • брокер: пересылка сообщений, через него передаются сообщения, EMQX — наш брокер, нам не нужно заботиться о его конкретной реализации при использовании

Фактически процесс использования MQTT таков: производитель отправляет сообщение в топик -> брокер доставляет сообщение -> потребитель, который подписывается на топик, получает сообщение и выполняет соответствующую бизнес-логику

Режим клиента

Этот режим в основном такой же, как традиционная ссылка на базу данных и ссылка Redis, которой могут легко управлять небольшие партнеры с опытом разработки.Нам нужно рассмотреть, создавать ли соответствующую фабрику, является ли это одноэлементным режимом, прототипом, или новый. А бассейн?

Мы используем шаблон singleton для этого введения

Создайте фабричный класс

Во-первых, мы создаем фабрику (не распознавайте отравление шаблонами проектирования)

public class MqttFactory {  

    private static MqttProperties configuration;  
    
    private static MqttClient client;  
 
    /**
    *   获取客户端实例
    *   单例模式, 存在则返回, 不存在则初始化
    */
    public static MqttClient getInstance() {    
        if (client == null) {      
            init();    
        }    
        return client;  
    }  
    
    /**
    *   初始化客户端
    */
    public static void init() {    
        try {      
            client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());      
            // MQTT配置对象
            MqttConnectOptions options = new MqttConnectOptions();      
            // 设置自动重连, 其它具体参数可以查看MqttConnectOptions
            options.setAutomaticReconnect(true);      
            if (!client.isConnected()) {        
            client.connect(options);      
            }    
        } catch (MqttException e) {      
            LOGGER.error(String.format("MQTT: 连接消息服务器[%s]失败", configuration.getAddress()));    
        }  
    }
    
}

Конкретную конфигурацию MQTT можно посмотретьMqttConnectOptions, здесь не поясняется

Еще одно слово, документация всегда лучше некоторых блогов!!!

Создать класс инструмента

Далее создаем MqttUtil для отправки сообщений и подписки на топик

public class MqttUtil {  

    /**
    *   发送消息
    *   @param topic 主题
    *   @param data 消息内容
    */
    public static void send(String topic, Object data) {    
        // 获取客户端实例
        MqttClient client = MqttFactory.getInstance();    
        ObjectMapper mapper = new ObjectMapper();    
        try {
            // 转换消息为json字符串
            String json = mapper.writeValueAsString(data);      
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));    
        } catch (JsonProcessingException e) {      
            LOGGER.error(String.format("MQTT: 主题[%s]发送消息转换json失败", topic));    
        } catch (MqttException e) {      
            LOGGER.error(String.format("MQTT: 主题[%s]发送消息失败", topic));    
        }  
    }
    
    /** 
    * 订阅主题 
    * @param topic 主题 
    * @param listener 消息监听处理器 
    */
    public static void subscribe(String topic, IMqttMessageListener listener) {  
        MqttClient client = MqttFactory.getInstance();  
        try {    
            client.subscribe(topic, listener);  
        } catch (MqttException e) {    
            LOGGER.error(String.format("MQTT: 订阅主题[%s]失败", topic));  
        }
    }
    
}

Я полагаю, что вы заметили вещь IMqttMessageListener. Нам нужно только создать класс прослушивателя и реализовать интерфейс IMqttMessageListener для обработки сообщений. Код выглядит следующим образом:

public class MessageListener implements IMqttMessageListener {  

    /** 
    * 处理消息
    * @param topic 主题 
    * @param mqttMessage 消息 
    */
    @Override  
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {   
        LOGGER.info(String.format("MQTT: 订阅主题[%s]发来消息[%s]", topic, new String(mqttMessage.getPayload())));  
    }
    
    public static void main(String[] args) {  
        //订阅主题test01, 使用MessageListener来处理它的消息
        MqttUtil.subscribe("test01", new MessageListener());
    }

}

Легко ли понять, отправка или подписка?

После того, как комфортные дела закончились, это приносит бесконечную пытку и пустоту, давай, бросим вызов второму режиму тяжелой умственной нагрузки!

Spring Integration

Что такое Spring Integration?Извините, я не знаю и не хочу знать

Зачем использовать Spring Integration, потому что он действительно хорошо поддерживается

Большинство руководств в Интернете посвящены интеграции Spring. Это может быть мой первый контакт. Я не могу понять то же самое, поэтому я решил отказаться от них и выбралВеликий БогМетод автоматической настройки

Помните концепции, которые мы обсуждали ранее: Субъект/Производитель/Потребитель?

В Spring Integration мы добавляем несколько новых концепций и дорабатываем предыдущие:

  • Канал: Канал, через который сообщения передаются и принимаются, через который каждое сообщение детализируется и выходит.
  • Фабрика клиентов: используется для создания клиентов MQTT, аналогичных тому, что используется в режиме 1.
  • Адаптер сообщений: используется для получения сообщений MQTT, их преобразования, но не участия в бизнес-логике.
  • Входящий канал: с адаптером сообщений, сообщениевойти на платформуканал
  • Исходящий канал: с клиентской фабрикой, сообщениеотправить платформуканал
  • тема: все еще тема, она не меняется
  • Продюсер: Парень с исходящим каналом
  • Потребитель: парень с входящим каналом

Если вы сможете постепенно понять приведенное выше определение, процесс этого режима на самом деле может стать таким:

  • Производитель: создать исходящий канал, указав клиентскую фабрику -> отправить сообщение
  • Потребитель: создать входящий канал указанного адаптера сообщений -> получать сообщения -> вводить перехватчики сообщений -> бизнес-логика

На самом деле, на мой взгляд, это соответствует концепции Spring Boot, и соглашение лучше, чем конфигурация.

Код содержимого этого блока более сложен и будет подробно объяснен позже.

Конкретный код может относиться к лицу, используемому для практики.Spring Boot Koala

Суммировать

Как служба сообщений, MQTT может удовлетворить большинство наших потребностей в разработке, но все еще остаются некоторые остающиеся проблемы, которые автор не продумывал и не практиковал глубоко:

  • Как использовать механизм qos, чтобы гарантировать, что данные не будут потеряны
  • Очередь и порядок сообщений
  • Приложение в кластерном режиме