Netty реализует высокопроизводительный сервер IOT (Groza) в исчерпывающей главе кода

Redis сервер Netty Интернет вещей

Рабочая среда:

  • JDK 8+
  • Maven 3.0+
  • Redis

Стек технологий:

  • SpringBoot 2.0+
  • Redis (Lettuceклиент,RedisTemplateшаблонный метод)
  • Netty 4.1+
  • MQTT 3.1.1

IDE:

  • ИДЕЯ или затмение
  • Плагин Ломбок

Введение

В последние годы Интернет вещей добился больших успехов. В Соединенных Штатах есть «Промышленный Интернет», в Германии — «Промышленность 4.0», а в моей стране также есть «Сделано в Китае 2025», все из которых связаны с облачными вычислениями и большими данными. . По данным Boston Consulting Group, только в обрабатывающей промышленности Китая новые технологии, такие как облачные вычисления, большие данные и искусственный интеллект, могут принести дополнительную добавленную стоимость до 6 трлн.

В промышленном Интернете обосновались отечественные и зарубежные гиганты, такие как Amazon AWS, Microsoft Azure, и три крупных отечественных оператора связи, Baidu Cloud, Huawei, Kingsoft Cloud и т. д. Среди них Tencent Cloud и Alibaba Cloud являются самыми мощными, а они также привлекают традиционные компании-производители.Ребята, отечественные гиганты развернули Интернет вещей. На саммите Yunqi-Shenzhen Summit 2018 Ху Сяомин, старший вице-президент Alibaba и президент Alibaba Cloud, объявил, что Alibaba официально войдет в IoT. Ху Сяомин сказал, что IoT — это новое основное направление деятельности Alibaba Group после электронной коммерции, финансов, логистики и облачных вычислений.

Отслеживание технологий Интернета вещей

Вышеуказанный контент, как разработчик, автор не является инвестором и пионером предпринимательства. Эти конкретные детали не имеют большого значения. Меня волнует только то, как использовать технологии длявыполнитьилимоделированиеСервер IOT, который поддерживает миллионы ссылок, не является строгим, просто для справки.

О том, почему вы выбрали промежуточное ПО, показанное на рисунке ниже, или если вы мало знаете о MQTT, вы можете прочитать две мои предыдущие статьи:

  1. Путь к реализации высокопроизводительного сервера Интернета вещей
  2. Netty реализует высокопроизводительный сервер IOT (Groza) с ручным разрывом протокола MQTT.

технический план

Быстрый старт

запустить тест

  1. git clone GitHub.com/Саншэншу…

  2. cd netty-iot

  3. Запустите NettyIotApplication

  4. Открытымhttp://localhost:8080/groza/v1/123456/auth, получить пароль!

  1. Запустите Eclipse Paho и введите имя пользователя и пароль для подключения.

  2. Запустите еще один Eclipse Paho и подпишитесь на случайные темы, например test. Еще один тест темы выпуска Eclipse Paho. чтобы получить сообщение.

  3. Отмените подписку на тему и снова опубликуйте сообщение. не получит сообщение.

С предвестием двух предыдущих статей и изучением протокола MQTT V3.1.1 я так много наговорил, что руки чешутся.

Вы строите это, Вы управляете этим!

Введение в структуру проекта

netty-iot
      ├── auth -- 认证
        ├── service -- 用户名,密码认证实现类
        ├── util -- 认证工具类
      ├── common -- 公共类
        ├── auth -- 用户名,密码认证接口
        ├── message -- 协议存储实体及接口类
        ├── session -- session存储实体及接口类
        ├── subscribe -- 订阅存储实体及接口类
      ├── config -- Redis配置
      ├── protocol -- MQTT协议实现
      ├── server -- MQTT服务器
      ├── store -- Redis数据存储
      	├── cache 
        ├── message 
        ├── session
        ├── subscribe
      ├── web -- web服务
      ├── NettyIotApplication -- 服务启动类

Redis

Установить

Чтобы испытать Redis, вам нужно использовать среду Linux или Mac.Если это Windows, вы можете рассмотреть возможность использования виртуальной машины. Есть четыре основных способа:

  • Установить с помощью Докера.
  • Скомпилируйте из исходного кода Github.
  • Установите напрямую с помощью apt-get install (Ubuntu), yum install (RedHat) или brew install (Mac).
  • Если читателям лень устанавливать и работать, они также могут использовать веб-версию.Web Redisнепосредственный опыт.

Конкретные операции заключаются в следующем:

Докер путь

# 拉取 redis 镜像
> docker pull redis
# 运行 redis 容器
> docker run --name myredis -d -p6379:6379 redis
# 执行容器中的 redis-cli,可以直接使用命令行操作 redis
> docker exec -it myredis redis-cli...

Метод компиляции исходного кода Github

# 下载源码
> git clone --branch 2.8 --depth 1 git@github.com:antirez/redis.git
> cd redis
# 编译
> make
> cd src
# 运行服务器,daemonize表示在后台运行
> ./redis-server --daemonize yes
# 运行命令行
> ./redis-cli...

прямая установка

# mac
> brew install redis
# ubuntu
> apt-get install redis
# redhat
> yum install redis
# 运行客户端
> redis-cli

использовать

Spring Boot除了支持常见的ORM框架外,更是对常用的中间件提供了非常好封装,随着Spring Boot2.x的到来,支持的组件越来越丰富,也越来越成熟,其中对Redis的支持不仅仅是丰富了它的API,更是替换掉底层Jedis的依赖,取而代之换成了Салат (салат), вы можете обратиться к этомустатьяНастройте проект. Поэтому я использую Lettuce в качестве клиента для кэширования сообщений, передаваемых по моему протоколу MQTT.

Ниже приведен режим работы, соответствующий Redis.

  • opsForValue: соответствует String (строка)
  • opsForZSet: соответствует ZSet (упорядоченный набор)
  • opsForHash: соответствует Hash (хеш)
  • opsForList: соответствует List (список)
  • opsForSet: соответствует Set (коллекция)
  • opsForGeo: соответствует GEO (географическому местоположению)

я в основном пользуюсьopsForValue,opsForHashиopsForZSet, для строк. я рекомендую использоватьStringRedisTemplate.

Ниже приводится краткое объяснение основных операций opsForValue и opsForHash.

Агентство хеш-данных Redis

Хэширование Redis позволяет пользователям хранить несколько пар ключ-значение в одном ключе Redis. общедоступный интерфейс HashOperations HashOperations предоставляет ряд методов для работы с хэшем:

java > template.opsForHash().put("books","java","think in java");
redis-cli > hset books java "think in java"  # 命令行的字符串如果包含空格,要用引号括起来
(integer) 1
------
java > template.opsForHash().put("books","golang","concurrency in go");
redis-cli > hset books golang "concurrency in go"
(integer) 1
------
java > template.opsForHash().put("books","python","python cookbook");
redis-cli > hset books python "python cookbook"
(integer) 1
------
java > template.opsForHash().entries("books")
redis-cli > hgetall books  # entries(),key 和 value 间隔出现
1) "java"
2) "think in java"
3) "golang"
4) "concurrency in go"
5) "python"
6) "python cookbook"
------
java > template.opsForHash().size("books")
redis-cli > hlen books
(integer) 3
------
java > template.opsForHash().get("redisHash","age")
redi-cli > hget books java
"think in java"
------
java > 
Map<String,Object> testMap = new HashMap();
      testMap.put("java","effective java");
      testMap.put("python","learning python");
      testMap.put("golang","modern golang programming");
template.opsForHash().putAll("books",testMap);
redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming"  # 批量 set
OK...

Структура данных Redis Set

Redis Set — это неупорядоченная коллекция строкового типа. Члены коллекции уникальны, что означает, что повторяющиеся данные не могут появляться в коллекции. Коллекции в Redis реализованы через хеш-таблицы, поэтому сложность добавления, удаления и поиска составляет O(1).

java > template.opsForSet().add("python","java","golang")
redis-cli > sadd books python java golang
(integer) 3
------
java > template.opsForSet().members("books")
redis-cli > smembers books  # 注意顺序,和插入的并不一致,因为 set 是无序的
1) "java"
2) "python"
3) "golang"
------
java > template.opsForSet().isMember("books","java")
redis-cli > sismember books java  # 查询某个 value 是否存在,相当于 contains(o)
(integer) 1
------
java > template.opsForSet().size("books")
redis-cli > scard books  # 获取长度相当于 count()
(integer) 3
------
java > template.opsForSet().pop("books")
redis-cli > spop books  # 弹出一个
"java"...

MQTT

MQTT — это облегченный протокол обмена сообщениями публикации/подписки, первоначально созданный примерно в 1998 году компаниями IBM и Arcom (которые позже стали частью Eurotech). в настоящее время,MQTT 3.1.1 СпецификациябылАльянс ОАЗИСстандартизация.

Загрузка клиента

Для клиента MQTT я используюEclipse PahoПроект Eclipse Paho предоставляет клиентскую реализацию протоколов обмена сообщениями MQTT и MQTT-SN с открытым исходным кодом для новых, существующих и появляющихся приложений для Интернета вещей (IoT). конкретныйссылка для скачивания, Каждый скачивает в соответствии со своей операционной системой.

Управляющее сообщение MQTT

├── Connect -- 连接服务端
├── DisConnect -- 断开连接
├── PingReq -- 心跳请求
├── PubAck -- 发布确认
├── PubComp -- 发布完成(QoS2,第散步)
├── Publish -- 发布消息
├── PubRec -- 发布收到(QoS2,第一步)
├── PubRel -- 发布释放(QoS2,第二步)
├── Subscribe -- 订阅主题
├── UnSubscribe -- 取消订阅

Connect

Давайте реализуем протокол подключения клиента к протоколу MQTT 3.1.1.

  1. Когда мы декодируем сообщение, если имя протокола неверно, серверМожетОтключить клиента от соединения, согласно этой спецификации серверне можетПродолжить обработку отчета CONNECT.

  2. Сервер использует идентификатор клиента (ClientId) для идентификации клиента. Каждый клиент, подключающийся к серверу, имеет уникальный идентификатор клиента (ClientId).

    // 消息解码器出现异常
            if (msg.decoderResult().isFailure()) {
                Throwable cause = msg.decoderResult().cause();
                if (cause instanceof MqttUnacceptableProtocolVersionException) {
                    // 不支持的协议版本
                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
                    channel.writeAndFlush(connAckMessage);
                    channel.close();
                    return;
                } else if (cause instanceof MqttIdentifierRejectedException) {
                    // 不合格的clientId
                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                    channel.writeAndFlush(connAckMessage);
                    channel.close();
                    return;
                }
                channel.close();
                return;
            }
    
  3. Когда clientId пуст или имеет значение null, клиент должен предоставить clientId, независимо от того, равен ли cleanSession 1 или нет, здесь нет ссылки на реализацию стандартного протокола.

           if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
                MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                        new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                channel.writeAndFlush(connAckMessage);
                channel.close();
                return;
            }
    
  4. Аутентификация по имени пользователя и паролю, которая требует, чтобы клиент предоставил имя пользователя и пароль при подключении, независимо от того, установлены ли флаги имени пользователя и пароля в 1, здесь нет ссылки на реализацию стандартного протокола.

               String username = msg.payload().userName();
               String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
               if (!grozaAuthService.checkValid(username,password)) {
                   MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                           new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                           new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
                   channel.writeAndFlush(connAckMessage);
                   channel.close();
                   return;
               }
    
  5. Если clientId этого нового соединения был сохранен в сеансе, соединение предыдущего clientId закрывается.

     if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
                SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
                Channel previous = sessionStore.getChannel();
                Boolean cleanSession = sessionStore.isCleanSession();
                if (cleanSession){
                    grozaSessionStoreService.remove(msg.payload().clientIdentifier());
                    grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
                    grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                    grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                }
                previous.close();
            }
    
  6. Обработка будет информации

    SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
            if (msg.variableHeader().isWillFlag()){
                MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
                        new MqttPublishVariableHeader(msg.payload().willTopic(),0),
                        Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
                );
                sessionStore.setWillMessage(willMessage);
            }
    
  7. Обработка пакетов подтверждения подключения

    if (msg.variableHeader().keepAliveTimeSeconds() > 0){
                if (channel.pipeline().names().contains("idle")){
                    channel.pipeline().remove("idle");
                }
                channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
            }
    
  8. До сих пор сообщение сеанса сохраняется и возвращается для принятия клиентского соединения, а clientId сохраняется в карте канала.

    grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
            channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
            Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
            MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
                    null
            );
            channel.writeAndFlush(okResp);
    
  9. Если cleanSession равен 0, сообщения DUP QoS1 и QoS2, хранящиеся в одном и том же clientId, должны быть отправлены повторно.

     if (!msg.variableHeader().isCleanSession()){
                List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier());
                List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
                dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
                    MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
                            new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
                            Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
                    );
                    channel.writeAndFlush(publishMessage);
                });
                dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
                    MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
                            MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
                            null
                    );
                    channel.writeAndFlush(pubRelMessage);
                });
            }
    

    Другие сообщения MQTT сравниваются с проектом и сравниваются сMQTT v3.1.1Убедитесь сами!

Аутентификация по имени пользователя и паролю

/**
 * 用户名和密码认证服务
 * @author 穆书伟
 */
@Service
public class AuthServiceImpl implements GrozaAuthService {
    private RSAPrivateKey privateKey;

    @Override
    public boolean checkValid(String username, String password) {
        if (StringUtils.isEmpty(username)){
            return false;
        }
        if (StringUtils.isEmpty(password)){
            return false;
        }
        RSA rsa = new RSA(privateKey,null);
        String value = rsa.encryptBcd(username, KeyType.PrivateKey);
        return value.equals(password) ? true : false;
    }

    @PostConstruct
    public void init() {
        privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key"));
    }
}

разное

На этом подробное объяснение реализации Netty высокопроизводительного сервера Интернета вещей (Groza) заканчивается.

Оригинал не просто, если вы чувствуете себя хорошо, я надеюсь дать рекомендацию! Ваша поддержка - самая большая мотивация для моего письма!

Следующее поможет вам продвигать сервер IOT, на котором Netty реализует протокол MQTT.

Уведомление об авторских правах:

Автор: Му Шувэй

Источник блога сада:блог woo woo woo.cn на.com/Sanshenghu…

источник на гитхабе:GitHub.com/Саншэншу…    

Источник личного блога:sanshengshui.github.io/