В предыдущем абзаце мне посчастливилось участвовать в разработке проекта умного дома.Поскольку у меня нет предыдущего опыта разработки в этой области, мне весьма любопытна модель разработки и технологический стек умного оборудования.
Продукт является сигнализатором горючего газа.Если концентрация утечки газа в доме достигает определенного порога, сигнализация обнаружит и загрузит значение концентрации газа в фон, а фон напомнит пользователям, что в доме может быть утечка газа. домой по телефону, SMS, WeChat и т. д.
Пользователь также может отправить будильнику некоторые инструкции, чтобы выключить будильник, отрегулировать громкость и так далее. Общая функция относительно проста, и общая логика показана на следующем рисунке:
Но когда я действительно участвовал в разработке, я был немного разочарован, потому что во всем процессе исследований и разработок не использовалась никакая новая технология, и это было все еще несколько видов обычного промежуточного программного обеспечения, просто другое использование.
Для технического отбораrabbitmq
При выполнении основных компонентов основное внимание уделяется тому, чтобы затраты на эксплуатацию и техническое обслуживание были низкими, а квалификация членов группы была относительно высокой.
Поделитесь с друзьями, как использоватьspringboot
+ rabbitmq
Создание Интернета вещей (IOT
) платформа, на самом деле интеллектуальное оборудование не так недостижимо, как представлялось!
Многие друзья могут быть немного смущены?rabbitmq
Разве это не очередь сообщений?Как я могу снова сделать умное оборудование??
фактическиrabbitmq
Есть два протокола, используется очередь сообщений, с которой мы обычно связываемсяAMQP
протокол, в то время как тот, который используется в интеллектуальном оборудовании,MQTT
протокол.
1. Что такое протокол MQTT?
MQTT
Полное имя (транспорт телеметрии очереди сообщений): публикация/подписка (publish
/subscribe
) Режим轻量级
Протокол связи, который получает сообщения путем подписки на соответствующий топик, — Интернет вещей (Internet of Thing
) в стандартном транспортном протоколе.
Протокол преобразует издателя сообщения (publisher
) и подписчиков (subscriber
) для разделения, поэтому он может предоставлять надежные службы сообщений для удаленно подключенных устройств в ненадежной сетевой среде, а его использование несколько похоже на традиционный MQ.
TCP
Протокол находится на транспортном уровне,MQTT
Протокол находится на прикладном уровне,MQTT
Протокол построен наTCP/IP
По протоколу, то есть до тех пор, пока он поддерживаетTCP/IP
Можно использовать любое место в стеке протоколовMQTT
протокол.
2. Зачем использовать протокол MQTT?
MQTT
Почему протоколы так популярны в Интернете вещей (IoT)? а не другие протоколы, такие как те, с которыми мы более знакомыHTTP
Что с соглашением?
-
во-первых
HTTP
Протокол Это синхронный протокол, клиент должен дождаться ответа сервера после запроса. В среде Интернета вещей (IOT) устройства будут сильно зависеть от среды, такой как низкая пропускная способность, высокая задержка в сети, нестабильная сетевая связь и т. д. Очевидно, что протоколы асинхронных сообщений более подходят.IOT
применение. -
HTTP
Он односторонний, клиент должен инициировать соединение, чтобы получить сообщение, а в приложениях Интернета вещей (IOT) устройства или датчики часто являются клиентами, что означает, что они не могут пассивно получать команды из сети. -
Обычно команду или сообщение необходимо отправить на все устройства в сети.
HTTP
Добиться такой функции не только сложно, но и чрезвычайно дорого.
3. Внедрение протокола MQTT
сказал раньшеMQTT
Это легкий протокол, который фокусируется только на отправке сообщений, поэтому структура этого протокола также очень проста.
MQTT-пакеты
существуетMQTT
соглашение, аMQTT
Пакеты отправляются:固定头
(фиксированный заголовок),可变头
(переменный заголовок),消息体
(полезная нагрузка) состоит из трех частей.
- Фиксированный заголовок (Fixed header), фиксированный заголовок присутствует во всех пакетах данных, включая тип пакета данных и идентификатор группировки пакета данных.
- Переменный заголовок (Variable header), в некоторых типах пакетов есть переменные заголовки.
- Тело сообщения с содержимым (полезная нагрузка), существующее в некоторых классах пакетов данных, представляет собой конкретное содержимое сообщения, полученное клиентом.
1. Фиксированная головка
Фиксированный заголовок, использующий два байта, всего 16 бит:(4-7) Биты представляют тип сообщения, который представлен 4-битным двоичным кодом, который может представлять следующие 16 типов сообщений, но позиции 0 и 15 зарезервированы для использования, поэтому всего существует 14 типов сообщений.
Флаг DUP (флаг повтора)
Флаг DUP: идентификатор для обеспечения надежной передачи сообщения и того, было ли сообщение доставлено. Значение по умолчанию равно 0, которое занимает только один байт, указывая на первую передачу.Когда значение равно 1, это указывает на то, что текущее сообщение было передано ранее.
QoS Level (уровень качества сообщения)
Уровень QoS: уровень качества сообщения, который будет подробно описан позже.
СОХРАНИТЬ (постоянный)
-
значение
1
: Указывает, что отправленное сообщение должно сохраняться все время, и на него не влияют перезапуски сервера.Его необходимо отправить не только текущему подписчику, но и вновь добавленному клиенту, который подписывается на это сообщение.Topic
, подписчики также получат уведомление немедленно.Уведомление: Для вновь добавленных подписчиков будет удален только последнийRETAIN flag = 1
толчок сообщения. -
значение
0
: отправить это сообщение только текущим подписчикам.
Оставшаяся длина
осталось в текущем сообщенииbyte
(байт), включая переменный заголовок и полезную нагрузку тела сообщения.
2. Переменная головка
Фиксированный заголовок определяет только тип сообщения и некоторые флаги, а некоторые метаданные сообщения необходимо поместить в переменный заголовок. Длина переменного содержимого заголовка в байтах + полезная нагрузка тела сообщения = оставшаяся длина.
Переменный заголовок располагается между фиксированным заголовком и полезной нагрузкой и содержит имя протокола, номер версии, флаг подключения, авторизацию пользователя, время пульса и т. д.
Переменные заголовки существуют для следующих типов сообщений: PUBLISH (QoS > 0), PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK.
3. Полезная нагрузка тела сообщения
Полезная нагрузка тела сообщения существует только вCONNECT
,PUBLISH
,SUBSCRIBE
,SUBACK
,UNSUBSCRIBE
Эти типы сообщений:
-
CONNECT
: содержит клиентскийClientId
, подписалсяTopic
,Message
а также用户名
и密码
. -
PUBLISH
: Отправить сообщение в соответствующую тему. -
SUBSCRIBE
: тема для подписки иQoS
. -
SUBACK
: сервер дляSUBSCRIBE
предмет заявки иQoS
Подтвердите и ответьте. -
UNSUBSCRIBE
: Чтобы отменить подписку на тему.
Качество сообщения (QoS)
消息质量
(Quality of Service), то есть качество отправленного сообщения, издатель (publisher
) и подписчиков (subscriber
) можно указатьqos
класс, даQoS 0
,QoS 1
,QoS 2
три уровня.
Различия между этими тремя уровнями объясняются ниже.
1. Качество обслуживания 0
Qos 0:At most once
Сообщение отправляется только один раз (максимум один раз), нет гарантии, что сообщение будет успешно доставлено, нет механизма подтверждения, и сообщение может быть потеряно или продублировано.
2. Качество обслуживания 1
Вопрос 1:At least once
(хотя бы один раз), в отличие отQoS 0
С точки зренияQos 1
выросack
Механизм подтверждения, отправитель (publisher
) отправляет сообщение брокеру MQTT (broker
), оба сами сначала сохранят сообщение, только когдаpublisher
илиBroker
получен отдельноPUBACK
При подтверждении само постоянное сообщение будет удалено, в противном случае оно будет отправлено повторно.
Но есть проблема, хотя мы можем убедиться, что клиент или сервер должны быть получены, подтвердивmessage
, но мы не можем гарантировать, что он будет получен только один разmessage
, то есть когда клиентpublisher
Не полученоBroker
изpuback
илиBroker
Не получилsubscriber
изpuback
, то он будет продолжать воспроизводиться.
издатель -> общий процесс брокера:
- сообщение магазина издателя -> публикация -> брокер (сообщение передачи)
- брокер -> публикация -> издатель удалить сообщение (подтвердить доставку успешно)
3. Качество обслуживания 2
Вопрос 2:Exactly once
(только один раз), в отличие отQoS 1
,QoS 2
Реализация обновления принимается только один разmessage
,publisher
иbroker
Сообщение также сохраняется, гдеpublisher
кэшированныйmessage
и соответствующийmsgID
,иbroker
кэшированныйmsgID
, вы можете убедиться, что сообщение не повторяется, потому что добавляется еще одноconfirm
механизма, весь процесс значительно усложняется.
издатель -> общий процесс брокера:
- publisher store msg -> publish ->broker -> broker store
- msgID (сообщение о доставке) broker -> puberc (подтверждение успешной доставки)
- издатель -> паблик -> брокер удалить msgID (указать брокеру удалить msgID)
- брокер -> pubcomp -> издатель удалить сообщение (указать издателю удалить сообщение)
LWT (Последняя воля и завещание)
LWT
полное имяLast Will and Testament
, по сути, завещание – это предмет и соответствующее ему сообщение, заранее определенное клиентом, приложенное кCONNECT
пакеты, в том числе遗愿主题
,遗愿 QoS
,遗愿消息
Ждать.
Когда MQTT-брокерBroker
клиент обнаруженclient
Когда соединение аварийно разорвано, сервер будет активно публиковать это сообщение, а затем соответствующие подписчики получат это сообщение.
взять каштан: все в чате подписываются наtalk
, но Сяо Фу внезапно отключил связь из-за дрожания сети.В это время все подписанные темы в чатеtalk
Клиенты всех получат "小富离开聊天室
«последнее желание».
Соответствующие параметры завещания:
-
Will Flag
: использовать ли LWT, 1 для включения -
Will Topic
: Имя субъекта Last Wish, подстановочные знаки не допускаются. -
Will Qos
: QoS для использования при публикации умирающего сообщения -
Will Retain
: Сохранить флаг последнего сообщения о желании -
Will Message
: Содержимое последнего сообщения о желании
этот клиентClient
Каковы сценарии аварийного отключения?
-
Broker
Обнаружено лежащее в основе исключение ввода-вывода; - Клиенту не удалось установить пульс
Keep Alive
в интервале иBroker
взаимодействовать с сообщениями; - Клиент закрывает нижний слой
TCP
не отправлено до подключенияDISCONNECT
пакет данных; - Клиент отправляет некорректный пакет
Broker
, что приводит к закрытию соединения и закрытию клиента и т. д.
Уведомление: когда клиент публикует черезDISCONNECT
Когда пакет отключен, это нормальное отключение, которое не сработает.LWT
механизм, при этомBroker
Он также отбросит соответствующую информацию, указанную текущим клиентом при подключении.LWT
параметр.
4. Сценарии применения протокола MQTT
MQTT
Протокол широко используется в Интернете вещей, мобильном Интернете, интеллектуальном оборудовании, Интернете транспортных средств, электроэнергетике и других областях. Также используется множество сценариев, некоторые из которых перечислены ниже:
- Связь IoT M2M, сбор больших данных IoT
- Push-сообщение Android, push-сообщение WEB
- Мобильный обмен мгновенными сообщениями, например Facebook Messenger
- Умное оборудование, умная мебель, умная техника
- Сетевая связь транспортных средств, сбор свай электростанций
- Умный город, телемедицина, дистанционное образование
- Отраслевые рынки, такие как энергетика, нефть и энергетика
Пять, реализация кода
конкретныйrabbitmq
Не буду вдаваться в подробности настройки окружения.В сети много руководств.Если у вас есть условия,вы можете использовать сервер.Если у вас нет условий,как у меня.Windows
Версия тоже очень радует.
1. Включите протокол mqtt для rabbitmq.
Давайте начнемrabbitmq
изmqtt
протокола, поскольку установка по умолчанию отключена, команда выглядит следующим образом:
rabbitmq-plugins enable rabbitmq_mqtt
2. пакет зависимостей клиента mqtt
Установлено на предыдущем шагеrabbitmq
среды и включитеmqtt
Фактически после соглашенияmqtt
Служба брокера сообщений настроена, и теперь нужно реализовать отправку и подписку клиентских сообщений.
использовать здесьspring-integration-mqtt
,org.eclipse.paho.client.mqttv3
Две реализации инструментария.
<!--mqtt依赖包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
3. Отправитель сообщения
Отправка сообщения относительно проста, в основном применяется для@ServiceActivator
примечание, примечаниеmessageHandler.setAsync
свойство, если установленоfalse
, который может блокироваться при отправке сообщений в асинхронном режиме.
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
MQTT
для отправки сообщений вовнеAPI
, вам нужно использовать@MessagingGateway
Аннотация для предоставления прокси-сервера шлюза сообщений, параметрdefaultRequestChannel
Укажите привязку для отправки сообщенияchannel
.
Можно добиться трехAPI
интерфейс,payload
для отправленного сообщения,topic
тема отправленного сообщения,qos
качество сообщения.
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {
// 向默认的 topic 发送消息
void sendMessage2Mqtt(String payload);
// 向指定的 topic 发送消息
void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
// 向指定的 topic 发送消息,并指定服务质量参数
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
4. Подписка на новости
Подписка на сообщения в основном похожа на идею реализации мониторинга сообщений MQ, которую мы обычно используем.@ServiceActivator
В аннотации указано, что текущий метод используется для обработкиMQTT
Информация,inputChannel
Параметр указывает сообщение, используемое для получения сообщения.channel
.
/**
* @Author: xiaofu
* @Description: 消息订阅配置
* @date 2020/6/8 18:24
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel iotMqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(iotMqttInputChannel());
return adapter;
}
/**
* @author xiaofu
* @description 消息订阅
* @date 2020/6/8 18:20
*/
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler handlerTest() {
return message -> {
try {
String string = message.getPayload().toString();
System.out.println("接收到消息:" + string);
} catch (MessagingException ex) {
//logger.info(ex.getMessage());
}
};
}
}
6. Тестовое сообщение
Эх~ Поскольку я ничего не знаю об оборудовании, то для имитации отправки сообщений с оборудования я могу использовать только несколько инструментов.MQTT
Протокол в основном такой же, как и наш предыдущий, но он встроен в аппаратное обеспечение на другом языке.
Выбранный здесь тестовый инструментmqttbox
,ссылка для скачивания:http://workswithweb.com/mqttbox.html
1. Отправка тестового сообщения
мы используем сначалаmqttbox
Ложная темаmqtt_test_topic
Отправьте сообщение, чтобы узнать, может ли фон успешно его получить.
Увидев, что фон успешно попал в темуmqtt_test_topic
сообщение отправлено.
2. Тестовая подписка на сообщения
использоватьmqttbox
Имитация подписки на темуmqtt_test_topic
, на фоне темыmqtt_test_topic
Отправьте сообщение, здесь я просто написалcontroller
Вызовите API, чтобы отправить сообщение.
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=Я отправляю сообщение в тему mqtt_test_topic в фоновом режимемы видимmqttbox
Сообщение о подписке от , успешно получило сообщение в фоновом режиме, до сих пор нашаMQTT
Даже если коммуникационная среда установлена успешно. если поставитьmqttbox
Инструмент заменяется специальным аппаратным оборудованием, и весь процесс представляет собой то, что мы часто называем умным домом, но на самом деле это не так уж сложно.
7. Примечания по применению
Проблемы, с которыми мы столкнулись в нашей реальной производственной среде, представлены здесь, чтобы каждый мог избежать ловушек.
clientId должен быть уникальным
на стороне клиентаconnect
При подключении появитсяclientId
параметры, которые должны быть уникальными для каждого клиента. Но мы находимся в стадии разработки и тестированияclientId
Это прямо прописано в коде, а сервисы развернуты в одном экземпляре, и никаких проблем не возникает.
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
Однако в производственной среде, поскольку служба развернута в кластере с несколькими экземплярами, возникает следующая странная проблема. Только один клиент может получить сообщение одновременно, другие клиенты не только не могут использовать сообщение, но и продолжают отключаться и снова подключаться:Lost connection: 已断开连接; retrying...
.
Это связано сclientId
То же самое ведет к конкурирующему потреблению среди клиентов и, в конечном счете,clientId
Если изменить способ получения с передатчика, проблема будет решена, поэтому этому месту нужно уделить особое внимание.
Обычно у программы нет проблем в среде разработки, но в производственной среде возникает множество проблем, многие из которых вызваны разными методами развертывания службы. Поэтому необходимо больше узнать о дистрибутиве.
8. Другое промежуточное ПО
MQTT
Это просто протокол, который поддерживаетMQTT
Существует множество продуктов промежуточного программного обеспечения для обмена сообщениями по протоколу, и нижеследующее является лишь частью из них.
- Mosquitto
- Eclipse Paho
- RabbitMQ
- Apache ActiveMQ
- HiveMQ
- JoramMQ
- ThingMQ
- VerneMQ
- Apache Apollo
- emqttd Xively
- IBM Websphere .....
Суммировать
Кроме того, я впервые делаю проект, связанный с аппаратным обеспечением. Прежде чем я услышал об умном доме, я чувствовал, что он очень высокий, но на самом деле, разработав его вручную, я обнаружил, что технология всегда то же самое, и это просто другое использование.
Руки на демо проектаgithub
Адрес: https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git, заинтересованные друзья могут скачать и запустить, это очень просто реализовать.
Оригинальность не так проста, содержание вывода жгучих волос, я надеюсь, что вы можете получить небольшой выигрыш!
Разобраны и розданы друзьям сотни различных технических электронных книг. Официальный аккаунт ответил [666], чтобы собрать его самостоятельно. Мы создали группу технического обмена с некоторыми друзьями, чтобы обсуждать технологии и делиться технической информацией, стремясь учиться и развиваться вместе.Если вам интересно, отсканируйте код, чтобы присоединиться к нам!