Рабочая среда:
- JDK 8+
- Maven 3.0+
- Redis
Стек технологий:
- SpringBoot 2.0+
- Redis (Lettuceклиент,RedisTemplateшаблонный метод)
- Netty 4.1+
- MQTT 3.1.1
IDE:
- ИДЕЯ или затмение
- Плагин Ломбок
Введение
В последние годы Интернет вещей добился больших успехов. В Соединенных Штатах есть «Промышленный Интернет», в Германии — «Промышленность 4.0», а в моей стране также есть «Сделано в Китае 2025», все из которых связаны с облачными вычислениями и большими данными. . По данным Boston Consulting Group, только в обрабатывающей промышленности Китая новые технологии, такие как облачные вычисления, большие данные и искусственный интеллект, могут принести дополнительную добавленную стоимость до 6 трлн.
В промышленном Интернете обосновались отечественные и зарубежные гиганты, такие как Amazon AWS, Microsoft Azure, и три крупных отечественных оператора связи, Baidu Cloud, Huawei, Kingsoft Cloud и т. д. Среди них Tencent Cloud и Alibaba Cloud являются самыми мощными, а они также привлекают традиционные компании-производители.Ребята, отечественные гиганты развернули Интернет вещей. На саммите Yunqi-Shenzhen Summit 2018 Ху Сяомин, старший вице-президент Alibaba и президент Alibaba Cloud, объявил, что Alibaba официально войдет в IoT. Ху Сяомин сказал, что IoT — это новое основное направление деятельности Alibaba Group после электронной коммерции, финансов, логистики и облачных вычислений.
Отслеживание технологий Интернета вещей
Вышеуказанный контент, как разработчик, автор не является инвестором и пионером предпринимательства. Эти конкретные детали не имеют большого значения. Меня волнует только то, как использовать технологии длявыполнитьилимоделированиеСервер IOT, который поддерживает миллионы ссылок, не является строгим, просто для справки.
О том, почему вы выбрали промежуточное ПО, показанное на рисунке ниже, или если вы мало знаете о MQTT, вы можете прочитать две мои предыдущие статьи:
- Путь к реализации высокопроизводительного сервера Интернета вещей
- Netty реализует высокопроизводительный сервер IOT (Groza) с ручным разрывом протокола MQTT.
технический план
Быстрый старт
запустить тест
-
git clone GitHub.com/Саншэншу…
-
cd netty-iot
-
Запустите NettyIotApplication
-
Открытымhttp://localhost:8080/groza/v1/123456/auth, получить пароль!
-
Запустите Eclipse Paho и введите имя пользователя и пароль для подключения.
-
Запустите еще один Eclipse Paho и подпишитесь на случайные темы, например test. Еще один тест темы выпуска Eclipse Paho. чтобы получить сообщение.
-
Отмените подписку на тему и снова опубликуйте сообщение. не получит сообщение.
С предвестием двух предыдущих статей и изучением протокола MQTT V3.1.1 я так много наговорил, что руки чешутся.
Вы строите это, Вы управляете этим!
Введение в структуру проекта
netty-iot
├── auth -- 认证
├── service -- 用户名,密码认证实现类
├── util -- 认证工具类
├── common -- 公共类
├── auth -- 用户名,密码认证接口
├── message -- 协议存储实体及接口类
├── session -- session存储实体及接口类
├── subscribe -- 订阅存储实体及接口类
├── config -- Redis配置
├── protocol -- MQTT协议实现
├── server -- MQTT服务器
├── store -- Redis数据存储
├── cache
├── message
├── session
├── subscribe
├── web -- web服务
├── NettyIotApplication -- 服务启动类
Redis
Установить
Чтобы испытать Redis, вам нужно использовать среду Linux или Mac.Если это Windows, вы можете рассмотреть возможность использования виртуальной машины. Есть четыре основных способа:
- Установить с помощью Докера.
- Скомпилируйте из исходного кода Github.
- Установите напрямую с помощью apt-get install (Ubuntu), yum install (RedHat) или brew install (Mac).
- Если читателям лень устанавливать и работать, они также могут использовать веб-версию.Web Redisнепосредственный опыт.
Конкретные операции заключаются в следующем:
Докер путь
# 拉取 redis 镜像
> docker pull redis
# 运行 redis 容器
> docker run --name myredis -d -p6379:6379 redis
# 执行容器中的 redis-cli,可以直接使用命令行操作 redis
> docker exec -it myredis redis-cli...
Метод компиляции исходного кода Github
# 下载源码
> git clone --branch 2.8 --depth 1 git@github.com:antirez/redis.git
> cd redis
# 编译
> make
> cd src
# 运行服务器,daemonize表示在后台运行
> ./redis-server --daemonize yes
# 运行命令行
> ./redis-cli...
прямая установка
# mac
> brew install redis
# ubuntu
> apt-get install redis
# redhat
> yum install redis
# 运行客户端
> redis-cli
использовать
Spring Boot除了支持常见的ORM框架外,更是对常用的中间件提供了非常好封装,随着
Spring Boot2.x的到来,支持的组件越来越丰富,也越来越成熟,其中对
Redis的支持不仅仅是丰富了它的API,更是替换掉底层
Jedis的依赖,取而代之换成了
Салат (салат), вы можете обратиться к этомустатьяНастройте проект. Поэтому я использую Lettuce в качестве клиента для кэширования сообщений, передаваемых по моему протоколу MQTT.
Ниже приведен режим работы, соответствующий Redis.
- opsForValue: соответствует String (строка)
- opsForZSet: соответствует ZSet (упорядоченный набор)
- opsForHash: соответствует Hash (хеш)
- opsForList: соответствует List (список)
- opsForSet: соответствует Set (коллекция)
- opsForGeo: соответствует GEO (географическому местоположению)
я в основном пользуюсьopsForValue,opsForHashиopsForZSet, для строк. я рекомендую использоватьStringRedisTemplate.
Ниже приводится краткое объяснение основных операций opsForValue и opsForHash.
Агентство хеш-данных Redis
Хэширование Redis позволяет пользователям хранить несколько пар ключ-значение в одном ключе Redis.
общедоступный интерфейс HashOperations
java > template.opsForHash().put("books","java","think in java");
redis-cli > hset books java "think in java" # 命令行的字符串如果包含空格,要用引号括起来
(integer) 1
------
java > template.opsForHash().put("books","golang","concurrency in go");
redis-cli > hset books golang "concurrency in go"
(integer) 1
------
java > template.opsForHash().put("books","python","python cookbook");
redis-cli > hset books python "python cookbook"
(integer) 1
------
java > template.opsForHash().entries("books")
redis-cli > hgetall books # entries(),key 和 value 间隔出现
1) "java"
2) "think in java"
3) "golang"
4) "concurrency in go"
5) "python"
6) "python cookbook"
------
java > template.opsForHash().size("books")
redis-cli > hlen books
(integer) 3
------
java > template.opsForHash().get("redisHash","age")
redi-cli > hget books java
"think in java"
------
java >
Map<String,Object> testMap = new HashMap();
testMap.put("java","effective java");
testMap.put("python","learning python");
testMap.put("golang","modern golang programming");
template.opsForHash().putAll("books",testMap);
redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming" # 批量 set
OK...
Структура данных Redis Set
Redis Set — это неупорядоченная коллекция строкового типа. Члены коллекции уникальны, что означает, что повторяющиеся данные не могут появляться в коллекции. Коллекции в Redis реализованы через хеш-таблицы, поэтому сложность добавления, удаления и поиска составляет O(1).
java > template.opsForSet().add("python","java","golang")
redis-cli > sadd books python java golang
(integer) 3
------
java > template.opsForSet().members("books")
redis-cli > smembers books # 注意顺序,和插入的并不一致,因为 set 是无序的
1) "java"
2) "python"
3) "golang"
------
java > template.opsForSet().isMember("books","java")
redis-cli > sismember books java # 查询某个 value 是否存在,相当于 contains(o)
(integer) 1
------
java > template.opsForSet().size("books")
redis-cli > scard books # 获取长度相当于 count()
(integer) 3
------
java > template.opsForSet().pop("books")
redis-cli > spop books # 弹出一个
"java"...
MQTT
MQTT — это облегченный протокол обмена сообщениями публикации/подписки, первоначально созданный примерно в 1998 году компаниями IBM и Arcom (которые позже стали частью Eurotech). в настоящее время,MQTT 3.1.1 СпецификациябылАльянс ОАЗИСстандартизация.
Загрузка клиента
Для клиента MQTT я используюEclipse PahoПроект Eclipse Paho предоставляет клиентскую реализацию протоколов обмена сообщениями MQTT и MQTT-SN с открытым исходным кодом для новых, существующих и появляющихся приложений для Интернета вещей (IoT). конкретныйссылка для скачивания, Каждый скачивает в соответствии со своей операционной системой.
Управляющее сообщение MQTT
├── Connect -- 连接服务端
├── DisConnect -- 断开连接
├── PingReq -- 心跳请求
├── PubAck -- 发布确认
├── PubComp -- 发布完成(QoS2,第散步)
├── Publish -- 发布消息
├── PubRec -- 发布收到(QoS2,第一步)
├── PubRel -- 发布释放(QoS2,第二步)
├── Subscribe -- 订阅主题
├── UnSubscribe -- 取消订阅
Connect
Давайте реализуем протокол подключения клиента к протоколу MQTT 3.1.1.
-
Когда мы декодируем сообщение, если имя протокола неверно, серверМожетОтключить клиента от соединения, согласно этой спецификации серверне можетПродолжить обработку отчета CONNECT.
-
Сервер использует идентификатор клиента (ClientId) для идентификации клиента. Каждый клиент, подключающийся к серверу, имеет уникальный идентификатор клиента (ClientId).
// 消息解码器出现异常 if (msg.decoderResult().isFailure()) { Throwable cause = msg.decoderResult().cause(); if (cause instanceof MqttUnacceptableProtocolVersionException) { // 不支持的协议版本 MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; } else if (cause instanceof MqttIdentifierRejectedException) { // 不合格的clientId MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; } channel.close(); return; }
-
Когда clientId пуст или имеет значение null, клиент должен предоставить clientId, независимо от того, равен ли cleanSession 1 или нет, здесь нет ссылки на реализацию стандартного протокола.
if (StrUtil.isBlank(msg.payload().clientIdentifier())) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; }
-
Аутентификация по имени пользователя и паролю, которая требует, чтобы клиент предоставил имя пользователя и пароль при подключении, независимо от того, установлены ли флаги имени пользователя и пароля в 1, здесь нет ссылки на реализацию стандартного протокола.
String username = msg.payload().userName(); String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8); if (!grozaAuthService.checkValid(username,password)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null); channel.writeAndFlush(connAckMessage); channel.close(); return; }
-
Если clientId этого нового соединения был сохранен в сеансе, соединение предыдущего clientId закрывается.
if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){ SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier()); Channel previous = sessionStore.getChannel(); Boolean cleanSession = sessionStore.isCleanSession(); if (cleanSession){ grozaSessionStoreService.remove(msg.payload().clientIdentifier()); grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier()); grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier()); grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier()); } previous.close(); }
-
Обработка будет информации
SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null); if (msg.variableHeader().isWillFlag()){ MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0), new MqttPublishVariableHeader(msg.payload().willTopic(),0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()) ); sessionStore.setWillMessage(willMessage); }
-
Обработка пакетов подтверждения подключения
if (msg.variableHeader().keepAliveTimeSeconds() > 0){ if (channel.pipeline().names().contains("idle")){ channel.pipeline().remove("idle"); } channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f))); }
-
До сих пор сообщение сеанса сохраняется и возвращается для принятия клиентского соединения, а clientId сохраняется в карте канала.
grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore); channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier()); Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession(); MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent), null ); channel.writeAndFlush(okResp);
-
Если cleanSession равен 0, сообщения DUP QoS1 и QoS2, хранящиеся в одном и том же clientId, должны быть отправлены повторно.
if (!msg.variableHeader().isCleanSession()){ List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier()); List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier()); dupPublishMessageStoreList.forEach(dupPublishMessageStore -> { MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0), new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()) ); channel.writeAndFlush(publishMessage); }); dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> { MqttMessage pubRelMessage = MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0), MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null ); channel.writeAndFlush(pubRelMessage); }); }
Другие сообщения MQTT сравниваются с проектом и сравниваются сMQTT v3.1.1Убедитесь сами!
Аутентификация по имени пользователя и паролю
/**
* 用户名和密码认证服务
* @author 穆书伟
*/
@Service
public class AuthServiceImpl implements GrozaAuthService {
private RSAPrivateKey privateKey;
@Override
public boolean checkValid(String username, String password) {
if (StringUtils.isEmpty(username)){
return false;
}
if (StringUtils.isEmpty(password)){
return false;
}
RSA rsa = new RSA(privateKey,null);
String value = rsa.encryptBcd(username, KeyType.PrivateKey);
return value.equals(password) ? true : false;
}
@PostConstruct
public void init() {
privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key"));
}
}
разное
На этом подробное объяснение реализации Netty высокопроизводительного сервера Интернета вещей (Groza) заканчивается.
Оригинал не просто, если вы чувствуете себя хорошо, я надеюсь дать рекомендацию! Ваша поддержка - самая большая мотивация для моего письма!
Следующее поможет вам продвигать сервер IOT, на котором Netty реализует протокол MQTT.
Уведомление об авторских правах:
Автор: Му Шувэй
Источник блога сада:блог woo woo woo.cn на.com/Sanshenghu…
источник на гитхабе:GitHub.com/Саншэншу…
Источник личного блога:sanshengshui.github.io/