springboot + rabbitmq для умного дома, не ожидал что так просто

Java
springboot + rabbitmq для умного дома, не ожидал что так просто

В предыдущем абзаце мне посчастливилось участвовать в разработке проекта умного дома.Поскольку у меня нет предыдущего опыта разработки в этой области, мне весьма любопытна модель разработки и технологический стек умного оборудования.

智能可燃气体报警器
Интеллектуальная сигнализация горючих газов

Продукт является сигнализатором горючего газа.Если концентрация утечки газа в доме достигает определенного порога, сигнализация обнаружит и загрузит значение концентрации газа в фон, а фон напомнит пользователям, что в доме может быть утечка газа. домой по телефону, SMS, WeChat и т. д.

Пользователь также может отправить будильнику некоторые инструкции, чтобы выключить будильник, отрегулировать громкость и так далее. Общая функция относительно проста, и общая логика показана на следующем рисунке:在这里插入图片描述

Но когда я действительно участвовал в разработке, я был немного разочарован, потому что во всем процессе исследований и разработок не использовалась никакая новая технология, и это было все еще несколько видов обычного промежуточного программного обеспечения, просто другое использование.

Для технического отбораrabbitmqПри выполнении основных компонентов основное внимание уделяется тому, чтобы затраты на эксплуатацию и техническое обслуживание были низкими, а квалификация членов группы была относительно высокой.


Поделитесь с друзьями, как использоватьspringboot + rabbitmqСоздание Интернета вещей (IOT) платформа, на самом деле интеллектуальное оборудование не так недостижимо, как представлялось!

Многие друзья могут быть немного смущены?rabbitmqРазве это не очередь сообщений?Как я могу снова сделать умное оборудование??

фактическиrabbitmqЕсть два протокола, используется очередь сообщений, с которой мы обычно связываемсяAMQPпротокол, в то время как тот, который используется в интеллектуальном оборудовании,MQTTпротокол.

1. Что такое протокол MQTT?

MQTTПолное имя (транспорт телеметрии очереди сообщений): публикация/подписка (publish/subscribe) Режим轻量级Протокол связи, который получает сообщения путем подписки на соответствующий топик, — Интернет вещей (Internet of Thing) в стандартном транспортном протоколе.

Протокол преобразует издателя сообщения (publisher) и подписчиков (subscriber) для разделения, поэтому он может предоставлять надежные службы сообщений для удаленно подключенных устройств в ненадежной сетевой среде, а его использование несколько похоже на традиционный MQ.在这里插入图片描述

TCPПротокол находится на транспортном уровне,MQTTПротокол находится на прикладном уровне,MQTTПротокол построен наTCP/IPПо протоколу, то есть до тех пор, пока он поддерживаетTCP/IPМожно использовать любое место в стеке протоколовMQTTпротокол.

2. Зачем использовать протокол MQTT?

MQTTПочему протоколы так популярны в Интернете вещей (IoT)? а не другие протоколы, такие как те, с которыми мы более знакомыHTTPЧто с соглашением?

  • во-первыхHTTPПротокол Это синхронный протокол, клиент должен дождаться ответа сервера после запроса. В среде Интернета вещей (IOT) устройства будут сильно зависеть от среды, такой как низкая пропускная способность, высокая задержка в сети, нестабильная сетевая связь и т. д. Очевидно, что протоколы асинхронных сообщений более подходят.IOTприменение.

  • HTTPОн односторонний, клиент должен инициировать соединение, чтобы получить сообщение, а в приложениях Интернета вещей (IOT) устройства или датчики часто являются клиентами, что означает, что они не могут пассивно получать команды из сети.

  • Обычно команду или сообщение необходимо отправить на все устройства в сети.HTTPДобиться такой функции не только сложно, но и чрезвычайно дорого.

3. Внедрение протокола MQTT

сказал раньшеMQTTЭто легкий протокол, который фокусируется только на отправке сообщений, поэтому структура этого протокола также очень проста.

MQTT-пакеты

существуетMQTTсоглашение, аMQTTПакеты отправляются:固定头(фиксированный заголовок),可变头(переменный заголовок),消息体(полезная нагрузка) состоит из трех частей.

  • Фиксированный заголовок (Fixed header), фиксированный заголовок присутствует во всех пакетах данных, включая тип пакета данных и идентификатор группировки пакета данных.
  • Переменный заголовок (Variable header), в некоторых типах пакетов есть переменные заголовки.
  • Тело сообщения с содержимым (полезная нагрузка), существующее в некоторых классах пакетов данных, представляет собой конкретное содержимое сообщения, полученное клиентом.
在这里插入图片描述
вставьте сюда описание изображения

1. Фиксированная головка

Фиксированный заголовок, использующий два байта, всего 16 бит:在这里插入图片描述(4-7) Биты представляют тип сообщения, который представлен 4-битным двоичным кодом, который может представлять следующие 16 типов сообщений, но позиции 0 и 15 зарезервированы для использования, поэтому всего существует 14 типов сообщений.图片源于网络,如有侵权联系删除

Флаг DUP (флаг повтора)

Флаг DUP: идентификатор для обеспечения надежной передачи сообщения и того, было ли сообщение доставлено. Значение по умолчанию равно 0, которое занимает только один байт, указывая на первую передачу.Когда значение равно 1, это указывает на то, что текущее сообщение было передано ранее.

QoS Level (уровень качества сообщения)

Уровень QoS: уровень качества сообщения, который будет подробно описан позже.

СОХРАНИТЬ (постоянный)

  • значение1: Указывает, что отправленное сообщение должно сохраняться все время, и на него не влияют перезапуски сервера.Его необходимо отправить не только текущему подписчику, но и вновь добавленному клиенту, который подписывается на это сообщение.Topic, подписчики также получат уведомление немедленно.Уведомление: Для вновь добавленных подписчиков будет удален только последнийRETAIN flag = 1толчок сообщения.

  • значение0: отправить это сообщение только текущим подписчикам.

Оставшаяся длина

осталось в текущем сообщенииbyte(байт), включая переменный заголовок и полезную нагрузку тела сообщения.

2. Переменная головка

Фиксированный заголовок определяет только тип сообщения и некоторые флаги, а некоторые метаданные сообщения необходимо поместить в переменный заголовок. Длина переменного содержимого заголовка в байтах + полезная нагрузка тела сообщения = оставшаяся длина.

Переменный заголовок располагается между фиксированным заголовком и полезной нагрузкой и содержит имя протокола, номер версии, флаг подключения, авторизацию пользователя, время пульса и т. д.

Переменные заголовки существуют для следующих типов сообщений: PUBLISH (QoS > 0), PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK.

3. Полезная нагрузка тела сообщения

Полезная нагрузка тела сообщения существует только вCONNECT,PUBLISH,SUBSCRIBE,SUBACK,UNSUBSCRIBEЭти типы сообщений:

  • CONNECT: содержит клиентскийClientId, подписалсяTopic,Messageа также用户名и密码.
  • PUBLISH: Отправить сообщение в соответствующую тему.
  • SUBSCRIBE: тема для подписки иQoS.
  • SUBACK: сервер дляSUBSCRIBEпредмет заявки иQoSПодтвердите и ответьте.
  • UNSUBSCRIBE: Чтобы отменить подписку на тему.

Качество сообщения (QoS)

消息质量(Quality of Service), то есть качество отправленного сообщения, издатель (publisher) и подписчиков (subscriber) можно указатьqosкласс, даQoS 0,QoS 1,QoS 2три уровня.

Различия между этими тремя уровнями объясняются ниже.

1. Качество обслуживания 0

Qos 0:At most onceСообщение отправляется только один раз (максимум один раз), нет гарантии, что сообщение будет успешно доставлено, нет механизма подтверждения, и сообщение может быть потеряно или продублировано.

图片源于网络,如有侵权联系删除
Изображение взято из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь и удалите его.

2. Качество обслуживания 1

Вопрос 1:At least once(хотя бы один раз), в отличие отQoS 0С точки зренияQos 1выросackМеханизм подтверждения, отправитель (publisher) отправляет сообщение брокеру MQTT (broker), оба сами сначала сохранят сообщение, только когдаpublisherилиBrokerполучен отдельноPUBACKПри подтверждении само постоянное сообщение будет удалено, в противном случае оно будет отправлено повторно.

Но есть проблема, хотя мы можем убедиться, что клиент или сервер должны быть получены, подтвердивmessage, но мы не можем гарантировать, что он будет получен только один разmessage, то есть когда клиентpublisherНе полученоBrokerизpubackилиBrokerНе получилsubscriberизpuback, то он будет продолжать воспроизводиться.

издатель -> общий процесс брокера:

  1. сообщение магазина издателя -> публикация -> брокер (сообщение передачи)
  2. брокер -> публикация -> издатель удалить сообщение (подтвердить доставку успешно)
图片源于网络,如有侵权联系删除
Изображение взято из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь и удалите его.

3. Качество обслуживания 2

Вопрос 2:Exactly once(только один раз), в отличие отQoS 1,QoS 2Реализация обновления принимается только один разmessage,publisherиbrokerСообщение также сохраняется, гдеpublisherкэшированныйmessageи соответствующийmsgIDbrokerкэшированныйmsgID, вы можете убедиться, что сообщение не повторяется, потому что добавляется еще одноconfirmмеханизма, весь процесс значительно усложняется.

издатель -> общий процесс брокера:

  1. publisher store msg -> publish ->broker -> broker store
  2. msgID (сообщение о доставке) broker -> puberc (подтверждение успешной доставки)
  3. издатель -> паблик -> брокер удалить msgID (указать брокеру удалить msgID)
  4. брокер -> pubcomp -> издатель удалить сообщение (указать издателю удалить сообщение)
图片源于网络,如有侵权联系删除
Изображение взято из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь и удалите его.

LWT (Последняя воля и завещание)

LWTполное имяLast Will and Testament, по сути, завещание – это предмет и соответствующее ему сообщение, заранее определенное клиентом, приложенное кCONNECTпакеты, в том числе遗愿主题,遗愿 QoS,遗愿消息Ждать.

Когда MQTT-брокерBrokerклиент обнаруженclientКогда соединение аварийно разорвано, сервер будет активно публиковать это сообщение, а затем соответствующие подписчики получат это сообщение.

взять каштан: все в чате подписываются наtalk, но Сяо Фу внезапно отключил связь из-за дрожания сети.В это время все подписанные темы в чатеtalkКлиенты всех получат "小富离开聊天室«последнее желание».

Соответствующие параметры завещания:

  • Will Flag: использовать ли LWT, 1 для включения
  • Will Topic: Имя субъекта Last Wish, подстановочные знаки не допускаются.
  • Will Qos: QoS для использования при публикации умирающего сообщения
  • Will Retain: Сохранить флаг последнего сообщения о желании
  • Will Message: Содержимое последнего сообщения о желании

этот клиентClientКаковы сценарии аварийного отключения?

  • BrokerОбнаружено лежащее в основе исключение ввода-вывода;
  • Клиенту не удалось установить пульсKeep Aliveв интервале иBrokerвзаимодействовать с сообщениями;
  • Клиент закрывает нижний слойTCPне отправлено до подключенияDISCONNECTпакет данных;
  • Клиент отправляет некорректный пакетBroker, что приводит к закрытию соединения и закрытию клиента и т. д.

Уведомление: когда клиент публикует черезDISCONNECTКогда пакет отключен, это нормальное отключение, которое не сработает.LWTмеханизм, при этомBrokerОн также отбросит соответствующую информацию, указанную текущим клиентом при подключении.LWTпараметр.

4. Сценарии применения протокола MQTT

MQTTПротокол широко используется в Интернете вещей, мобильном Интернете, интеллектуальном оборудовании, Интернете транспортных средств, электроэнергетике и других областях. Также используется множество сценариев, некоторые из которых перечислены ниже:

  • Связь IoT M2M, сбор больших данных IoT
  • Push-сообщение Android, push-сообщение WEB
  • Мобильный обмен мгновенными сообщениями, например Facebook Messenger
  • Умное оборудование, умная мебель, умная техника
  • Сетевая связь транспортных средств, сбор свай электростанций
  • Умный город, телемедицина, дистанционное образование
  • Отраслевые рынки, такие как энергетика, нефть и энергетика

Пять, реализация кода

конкретныйrabbitmqНе буду вдаваться в подробности настройки окружения.В сети много руководств.Если у вас есть условия,вы можете использовать сервер.Если у вас нет условий,как у меня.WindowsВерсия тоже очень радует.

在这里插入图片描述
вставьте сюда описание изображения

1. Включите протокол mqtt для rabbitmq.

Давайте начнемrabbitmqизmqttпротокола, поскольку установка по умолчанию отключена, команда выглядит следующим образом:

rabbitmq-plugins enable rabbitmq_mqtt

2. пакет зависимостей клиента mqtt

Установлено на предыдущем шагеrabbitmqсреды и включитеmqttФактически после соглашенияmqttСлужба брокера сообщений настроена, и теперь нужно реализовать отправку и подписку клиентских сообщений.

использовать здесьspring-integration-mqtt,org.eclipse.paho.client.mqttv3Две реализации инструментария.

<!--mqtt依赖包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

3. Отправитель сообщения

Отправка сообщения относительно проста, в основном применяется для@ServiceActivatorпримечание, примечаниеmessageHandler.setAsyncсвойство, если установленоfalse, который может блокироваться при отправке сообщений в асинхронном режиме.

@Configuration
public class IotMqttProducerConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
        messageHandler.setAsync(false);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
}

MQTTдля отправки сообщений вовнеAPI, вам нужно использовать@MessagingGatewayАннотация для предоставления прокси-сервера шлюза сообщений, параметрdefaultRequestChannelУкажите привязку для отправки сообщенияchannel.

Можно добиться трехAPIинтерфейс,payloadдля отправленного сообщения,topicтема отправленного сообщения,qosкачество сообщения.

@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {

    // 向默认的 topic 发送消息
    void sendMessage2Mqtt(String payload);
    // 向指定的 topic 发送消息
    void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    // 向指定的 topic 发送消息,并指定服务质量参数
    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

4. Подписка на новости

Подписка на сообщения в основном похожа на идею реализации мониторинга сообщений MQ, которую мы обычно используем.@ServiceActivatorВ аннотации указано, что текущий метод используется для обработкиMQTTИнформация,inputChannelПараметр указывает сообщение, используемое для получения сообщения.channel.

/**
 * @Author: xiaofu
 * @Description: 消息订阅配置
 * @date 2020/6/8 18:24
 */
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

    /**
     * @author xiaofu
     * @description 消息订阅
     * @date 2020/6/8 18:20
     */
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handlerTest() {

        return message -> {
            try {
                String string = message.getPayload().toString();
                System.out.println("接收到消息:" + string);
            } catch (MessagingException ex) {
                //logger.info(ex.getMessage());
            }
        };
    }
}

6. Тестовое сообщение

Эх~ Поскольку я ничего не знаю об оборудовании, то для имитации отправки сообщений с оборудования я могу использовать только несколько инструментов.MQTTПротокол в основном такой же, как и наш предыдущий, но он встроен в аппаратное обеспечение на другом языке.

Выбранный здесь тестовый инструментmqttbox,ссылка для скачивания:http://workswithweb.com/mqttbox.html

1. Отправка тестового сообщения

мы используем сначалаmqttboxЛожная темаmqtt_test_topicОтправьте сообщение, чтобы узнать, может ли фон успешно его получить.

在这里插入图片描述Увидев, что фон успешно попал в темуmqtt_test_topicсообщение отправлено.在这里插入图片描述

2. Тестовая подписка на сообщения

использоватьmqttboxИмитация подписки на темуmqtt_test_topic, на фоне темыmqtt_test_topicОтправьте сообщение, здесь я просто написалcontrollerВызовите API, чтобы отправить сообщение.

http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=Я отправляю сообщение в тему mqtt_test_topic в фоновом режиме在这里插入图片描述мы видимmqttboxСообщение о подписке от , успешно получило сообщение в фоновом режиме, до сих пор нашаMQTTДаже если коммуникационная среда установлена ​​успешно. если поставитьmqttboxИнструмент заменяется специальным аппаратным оборудованием, и весь процесс представляет собой то, что мы часто называем умным домом, но на самом деле это не так уж сложно.在这里插入图片描述

7. Примечания по применению

Проблемы, с которыми мы столкнулись в нашей реальной производственной среде, представлены здесь, чтобы каждый мог избежать ловушек.

clientId должен быть уникальным

на стороне клиентаconnectПри подключении появитсяclientIdпараметры, которые должны быть уникальными для каждого клиента. Но мы находимся в стадии разработки и тестированияclientIdЭто прямо прописано в коде, а сервисы развернуты в одном экземпляре, и никаких проблем не возникает.

MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());

Однако в производственной среде, поскольку служба развернута в кластере с несколькими экземплярами, возникает следующая странная проблема. Только один клиент может получить сообщение одновременно, другие клиенты не только не могут использовать сообщение, но и продолжают отключаться и снова подключаться:Lost connection: 已断开连接; retrying....

在这里插入图片描述Это связано сclientIdТо же самое ведет к конкурирующему потреблению среди клиентов и, в конечном счете,clientIdЕсли изменить способ получения с передатчика, проблема будет решена, поэтому этому месту нужно уделить особое внимание.

Обычно у программы нет проблем в среде разработки, но в производственной среде возникает множество проблем, многие из которых вызваны разными методами развертывания службы. Поэтому необходимо больше узнать о дистрибутиве.

8. Другое промежуточное ПО

MQTTЭто просто протокол, который поддерживаетMQTTСуществует множество продуктов промежуточного программного обеспечения для обмена сообщениями по протоколу, и нижеследующее является лишь частью из них.

  • Mosquitto
  • Eclipse Paho
  • RabbitMQ
  • Apache ActiveMQ
  • HiveMQ
  • JoramMQ
  • ThingMQ
  • VerneMQ
  • Apache Apollo
  • emqttd Xively
  • IBM Websphere .....

Суммировать

Кроме того, я впервые делаю проект, связанный с аппаратным обеспечением. Прежде чем я услышал об умном доме, я чувствовал, что он очень высокий, но на самом деле, разработав его вручную, я обнаружил, что технология всегда то же самое, и это просто другое использование.

Руки на демо проектаgithubАдрес: https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git, заинтересованные друзья могут скачать и запустить, это очень просто реализовать.

Оригинальность не так проста, содержание вывода жгучих волос, я надеюсь, что вы можете получить небольшой выигрыш!

Разобраны и розданы друзьям сотни различных технических электронных книг. Официальный аккаунт ответил [666], чтобы собрать его самостоятельно. Мы создали группу технического обмена с некоторыми друзьями, чтобы обсуждать технологии и делиться технической информацией, стремясь учиться и развиваться вместе.Если вам интересно, отсканируйте код, чтобы присоединиться к нам!