Глядя на реализацию протокола доступа IOT MQTT из исходного кода moquette

задняя часть сервер исходный код Интернет вещей

задний план

Чтение хорошего кода — это своего рода удовольствие, описывать хороший код со своим мировоззрением очень больно, это убьет 100 миллионов клеток мозга.

Эта заметка о прочтении исходного кода была сделана еще год назад.В то время я просто записывал собственное резюме.Недавно я реорганизовал ее для помощи нуждающимся.

С быстрым проникновением мобильного Интернета во вторую половину все больше и больше предприятий обращают свое внимание на Интернет вещей. Например, общие велосипеды и продукты Xiaomi для умного дома являются типичными приложениями IoT.

Предприятия считают, что с помощью больших данных и технологии искусственного интеллекта можно создать много дополнительной ценности для создания новых бизнес-моделей. Массовые данные должны пройти через службу доступа, чтобы попасть на серверную часть для создания последующей ценности.В службе доступа MQTT стал непонятным стандартным протоколом в Интернете вещей.Как отечественные, так и зарубежные облачные фабрики имеют своего брокера реализации.

характеристика

Протокол MQTT — это протокол, предназначенный для связи удаленных датчиков и устройств управления с ограниченной вычислительной мощностью и работающих в ненадежных сетях с низкой пропускной способностью и обладающий следующими основными характеристиками:

  1. Используйте шаблон сообщений публикации/подписки, чтобы обеспечить публикацию сообщений «один ко многим» и развязку приложений.
  2. Передача сообщения, замаскированная под содержимое полезной нагрузки
  3. Обеспечивает сетевое подключение с использованием TCP/IP
  4. Существует три качества службы публикации сообщений.
    • Публикация сообщений «не более одного раза» полностью зависит от базовой сети TCP/IP. Возможна потеря или дублирование сообщений. Этот уровень можно использовать в случае данных датчика окружающей среды, не имеет значения, если одна прочитанная запись потеряна, потому что скоро будет вторая передача.
    • «По крайней мере один раз» гарантирует получение сообщений, но может произойти дублирование сообщений.
    • «Только один раз» гарантирует, что сообщение придет один раз. Этот уровень можно использовать в биллинговой системе, где дублирование или потеря сообщений могут привести к неправильным результатам.
  5. Небольшие передачи с небольшими накладными расходами (2 байта для заголовка фиксированной длины) и минимальным переключением протоколов для уменьшения сетевого трафика.
  6. Механизмы уведомления заинтересованных сторон об отключении клиента с помощью функций Last Will и Testament

== Реализация вышеуказанных функций будет объяснена ниже ==

период, термин

image

Клиент Клиент

Программы или устройства, использующие MQTT, такие как датчики мониторинга окружающей среды, общие велосипеды, общие блоки питания и т. д.

сервер сервер

Программа или устройство, выступающее в качестве посредника между клиентом, отправляющим сообщение, и клиентом, запрашивающим подписку.

Процесс публикации и подписки

Процесс отправки клиентом-A сообщения «привет» клиенту-B выглядит следующим образом:

  1. Клиент-B подписывается на тему с именем msg
  2. Клиент-A отправляет «привет» серверу-серверу и указывает, что он отправлен в тему с именем msg
  3. Сервер-Сервер пересылает сообщение «привет» Клиенту-B

В отличие от режима запроса-ответа протокола HTTP, клиент-A и клиент-B не имеют прямого отношения соединения, и передача сообщений между ними перенаправляется через сервер-сервер. Сервер сервер также называютMQTT Brokerто есть посредник между подпиской и отправкой

Анализ реализации функции на основе исходного кода moquette

В описанном выше процессе, когда клиент-A отправляет сообщение «привет» клиенту-B, требуются следующие действия.

  1. Клиент-А, Клиент-Бсоединятьна сервер сервер
  2. клиент-Bподпискатема
  3. клиент-АвыпускИнформация
  4. Сервер сервер пересылает сообщения
  5. Клиент-B получает сообщение

Далее будет следовать отслеживание и интерпретация исходного кода на основе действий подключения, подписки и выпуска.

соединять

image

основная концепция:

Session: Сессия — это логическая связь между клиентом (обозначается ClientId) и сервером; жизненный цикл (время существования): сессия >= сетевое соединение.

ClientID: уникальный идентификатор клиента, сервер используется для связывания сеанса Может содержать только эти прописные буквы, строчные буквы и цифры (0-9a-zA-Z) в пределах 23 символов. Если ClientID остается одинаковым для нескольких TCP-соединений, клиент и сервер сохранят информацию о сеансе (Session). В то же время Сервер и один и тот же ClientID могут поддерживать только одно TCP-соединение, и повторное подключение приведет к сбросу предыдущего.

CleanSession: в Connect, устанавливается клиентом

  • 0 Включает механизм повторного использования сеанса. После отключения и повторного подключения к сети информация о предыдущем сеансе восстанавливается. Клиент и сервер должны иметь соответствующий механизм сохранения сеанса;
  • 1 Отключите механизм повторного использования сеанса. Каждое соединение — это новая сессия, и сессия длится столько же, сколько и сетевое соединение.

Keep Alive: Цель состоит в том, чтобы поддерживать надежность долгосрочных соединений, а также подтверждение того, что обе стороны находятся в сети. Клиент устанавливает продолжительность Keep Alive при подключении. Если сервер не получает сообщение клиента в течение 1,5 * времени KeepAlive, он должен отключить сетевое соединение клиента. Значение Keep Alive определяется конкретным приложением, обычно это несколько минут. Максимально допустимое время составляет 18 часов 12 минут и 15 секунд.

Will: Will Message хранится на стороне сервера.Когда сетевое соединение закрыто, серверная сторона должна опубликовать Will Message, поэтому оно ярко называется Will, которое можно использовать для уведомления об аварийном отключении. Клиент отправляет DISCONNECT, чтобы закрыть ссылку, завещание недействительно и удалено. Условия для выпуска завещательных сообщений, в том числе: Сервер обнаружил ошибку ввода-вывода или сбой сети Клиенту не удалось связаться в течение времени Keep Alive. Клиент напрямую закрыл сетевое соединение без предварительной отправки сообщения DISCONNECT. Сервер закрыл сетевое соединение из-за ошибки протокола Соответствующие элементы настройки должны быть указаны клиентом при подключении.

Will Flag: Главный переключатель воли

  • 0 Disable будет работать, Will QoS и Will Retain должны быть равны 0
  • 1 Чтобы включить функцию will, вам необходимо установить Will Retain и Will QoS.

Will QoS: QoS сообщения может принимать значения 0, 1, 2, что означает то же самое, что и QoS сообщения.

Will Retain: останется

  • 0 Будут ли сообщения не сохраняться, и последующие подписки не будут получать сообщения
  • 1 Сохранение сообщений, постоянное хранилище

Will Topic: тема завещания

Will Payload: Содержание завещания

процесс подключения

  1. Судя по номеру версии протокола MQTT, отправляемого клиентом при подключении, не-3.1 и 3.1.1 версии протокола отправки не поддерживают ответные пакеты и закрывают соединение после отправки
  2. В случае, когда клиент настроен с cleanSession=false или сервер не допускает существования clientId, если клиент не загружает clientId, протокол отправки не поддерживает ответное сообщение и закрывает соединение после завершения отправки.
  3. Проверьте правильность имени пользователя и пароля
  4. Инициализируйте объект соединения и поместите ссылку на объект соединения в управление соединениями, если он обнаружит, что в управлении соединениями есть объект с тем же идентификатором клиента, закройте предыдущее соединение и поместите новый объект соединения в управление соединениями.
  5. Отрегулируйте время оценки пульса текущего соединения сервера в соответствии со временем пульса, загруженным клиентом (keepAlive * 1.5f)
  6. Хранилище сообщений Wills (публикация сообщений в сохраненной теме при неожиданном разрыве соединения)
  7. Отправить ответ об успешном подключении
  8. Создать текущий сеанс подключения
  9. Когда cleanSession=false, отправить сообщение о том, что текущая сессия сохранена
public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());

        // 1. 判断客户端连接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭连接
        if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
                && msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);

            LOG.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        final boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            // 2. 在客户端配置了cleanSession=false 或者服务端不允许clientId不存在的情况下客户端如果未上传clientId发送协议不支持响应报文并在发送完成后关闭连接
            if (!cleanSession || !this.allowZeroByteClientId) {
                MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);

                channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
                channel.close().addListener(CLOSE_ON_FAILURE);
                LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }

            // Generating client id.
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
                payload.userName());
        }
        // 3. 判断用户名和密码是否合法
        if (!login(channel, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        // 4. 初始化连接对象并将连接对象引用放入连接管理中,如果发现连接管理中存在相同客户端ID的对象则关闭前一个连接并将新的连接对象放入连接管理中
        ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            //return;
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        // 5. 根据客户端上传的心跳时间调整服务端当前连接的心跳判断时间(keepAlive * 1.5f)
        initializeKeepAliveTimeout(channel, msg, clientId);
        // 6. 遗嘱消息存储(当连接意外断开时向存储的主题发布消息)
        storeWillMessage(msg, clientId);
        // 7. 发送连接成功响应
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

        if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        // 8. 创建当前连接session
        final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
        // 9. 当cleanSession=false 发送当前session已经存储的消息
        if (!republish(descriptor, msg, clientSession)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        
        int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
        setupAutoFlusher(channel, flushIntervalMs);

        final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
        if (!success) {
            channel.close().addListener(CLOSE_ON_FAILURE);
        }

        LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
    }

подписка

image

Основные понятия

Процесс подписки

  1. Проверка темы подписки (авторитет, действительность пути к теме)
  2. Сохранить подписанные темы в текущем сеансе
  3. Глобальная древовидная структура используется для хранения информации о подписке (информация о теме и подписчике), которая используется для поиска соответствующего подписчика в соответствии с темой во время пересылки сообщения (Древовидная структура и алгоритм поиска представлены в следующей главе.)
  4. отправить ответ о подписке
  5. Сканировать постоянные сообщения, соответствующие текущей теме, на которую подписана, и немедленно отправлять сообщения на это соединение.
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
                clientID, messageID);
            return;
        }
        String username = NettyUtils.userName(channel);
        // 1、订阅的主题校验(权限、主题path合法性)
        List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
                "messageId={}", clientID, messageID);
            return;
        }

        LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
        // 2、在当前session中存储订阅的主题
        List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);

        // save session, persist subscriptions from session
        // 3、采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者
        for (Subscription subscription : newSubscriptions) {
            subscriptions.add(subscription);
        }

        LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
        // 4、发送订阅回应
        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);

        // fire the persisted messages in session
        // 5、扫描持久化的消息匹配到当前订阅主题的立即向此连接发送消息
        for (Subscription subscription : newSubscriptions) {
            publishRetainedMessagesInSession(subscription, username);
        }

        boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
        } else {
            LOG.info("Client <{}> subscribed to topics", clientID);
        }
    }

выпуск

Основные понятия

Packet Identifier: идентификатор сообщения существует в переменной части заголовка сообщения, ненулевое двухбайтовое целое число (0-65535].

Повторить в одном процессе: эти пакеты содержат PacketID и согласованы в процессе связи: PUBLISH (когда QoS>0), PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCIBE, UNSUBACK.

Новый, не повторяющийся: клиент ДОЛЖЕН назначать неиспользуемый в настоящее время PacketID каждый раз, когда он отправляет новый пакет этих типов. После того, как клиент обработал подтверждение, соответствующее сообщению, идентификатор сообщения высвобождается для повторного использования.

Независимое обслуживание: клиент и сервер назначают идентификаторы сообщений независимо друг от друга. Следовательно, комбинация клиент-сервер может использовать один и тот же идентификатор сообщения для обеспечения параллельного обмена сообщениями. Не является ненормальным, если идентификатор пакета, сгенерированный клиентом и сервером, непротиворечив.

Payload: Максимально допустимый размер полезной нагрузки, то есть тела сообщения, составляет 256 МБ. Полезная нагрузка публикации может быть пустой, и во многих случаях это означает, что постоянное сообщение (или сообщение о намерении) пусто. Используйте кодировку UTF-8.

Retain: постоянные сообщения (липкие сообщения)

СОХРАНИТЬ знак: для каждого сообщения публикации требуется указанный тег.

  • 0 Сервер НЕ ДОЛЖЕН хранить это сообщение, а также удалять или заменять любые существующие зарезервированные сообщения.
  • 1 Сервер ДОЛЖЕН хранить сообщение приложения и его уровень QoS, чтобы его можно было передать будущим подписчикам.

Каждая тема будет сохранять не более одного сохраняемого сообщения. Клиенты, подписавшиеся на темы с постоянными сообщениями, получат это сообщение немедленно.

Сервер может отказаться от постоянных сообщений, например, когда памяти или хранилища недостаточно.

Если клиент хочет удалить постоянное сообщение в теме, он может отправить в тему постоянное сообщение с пустой полезной нагрузкой. То же самое верно и для механизма сохраняемости Retain сообщения Will.

QoS: Уровень обслуживания (надежность сообщения)

Процесс публикации

public void processPublish(Channel channel, MqttPublishMessage msg) {
        final MqttQoS qos = msg.fixedHeader().qosLevel();
        final String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
                msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
        switch (qos) {
            case AT_MOST_ONCE:
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            case AT_LEAST_ONCE:
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            case EXACTLY_ONCE:
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            default:
                LOG.error("Unknown QoS-Type:{}", qos);
                break;
        }
    }

Из оператора switch приведенного выше кода видно, что он будет обрабатываться в соответствии с уровнем QoS сообщения.

QoS0 не более одного раза
sequenceDiagram
ClientA->>ServerBroker: 发送消息
ServerBroker->>ClientB: 发送消息
  1. Суждение власти
  2. Опубликовать сообщение для всех подписчиков темы
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);
        // 2. 向所有该主题的订阅者发布消息
        this.publisher.publish2Subscribers(toStoreMsg, topic);

        if (msg.fixedHeader().isRetain()) {
            // 3. QoS == 0 && retain => clean old retained
            m_messagesStore.cleanRetained(topic);
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
QoS1 хотя бы один раз
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应PUBACK
ServerBroker->>ClientB: 2.发送消息
ClientB->>ServerBroker: 2.1发送消息回应PUBACK
ServerBroker->>ServerBroker: 2.2删除消息

1. Отправить сообщение ПУБЛИКАЦИЯ

  1. Суждение власти
  2. Публиковать сообщения всем подписчикам темы (каждая сессия хранит сообщения для отправки)
  3. Отправить ответ подтверждения
  4. сохранить = true => сохранить сообщение
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }

        final int messageID = msg.variableHeader().messageId();

        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        // 2. 向所有该主题的订阅者发布消息(每个session中存储即将要发送的消息)
        this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);

        // 3. 发送Ack回应
        sendPubAck(clientID, messageID);

        // 4. retain = true => 存储消息
        if (msg.fixedHeader().isRetain()) {
            if (!msg.payload().isReadable()) {
                m_messagesStore.cleanRetained(topic);
            } else {
                // before wasn't stored
                m_messagesStore.storeRetained(topic, toStoreMsg);
            }
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

2.1 Отправить сообщение в ответ на PUBACK

После того, как сервер сервера получит сообщение PUBACK, он выполнит:

  1. удалить сообщения, хранящиеся в сеансе
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", messageID);

        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);

        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
                                                                                messageID);
        m_interceptor.notifyMessageAcknowledged(wrapped);
    }
QoS2 один раз и только
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应Rec
ClientA->>ServerBroker: 2.发送消息Rel
ServerBroker->>ServerBroker: 2.1删除消息
ServerBroker->>ServerBroker: 2.2存储消息到发送列队
ServerBroker->>ClientB: 2.3发送消息
ServerBroker->>ClientA: 2.4发送消息回应Comp
ClientB->>ServerBroker: 3.发送消息回应Rec
ServerBroker->>ServerBroker: 3.1删除2.2中存储的消息(一次确认)
ServerBroker->>ServerBroker: 3.2存储消息
ServerBroker->>ClientB: 3.3发送消息Rel
ClientB->>ServerBroker: 3.4发送消息回应Comp
ServerBroker->>ServerBroker: 3.5删除消息(二次确认)

1. Отправить сообщение ПУБЛИКАЦИЯ

  1. Суждение власти
  2. сохранить сообщение
  3. Отправить ответ Rec
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        final Topic topic = new Topic(msg.variableHeader().topicName());
        // check if the topic can be wrote
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        final int messageID = msg.variableHeader().messageId();

        // 2. 存储消息
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
        }

        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);

        // 3. 发送Rec回应
        sendPubRec(clientID, messageID);

        // Next the client will send us a pub rel
        // NB publish to subscribers for QoS 2 happen upon PUBREL from publisher

//        if (msg.fixedHeader().isRetain()) {
//            if (msg.payload().readableBytes() == 0) {
//                m_messagesStore.cleanRetained(topic);
//            } else {
//                m_messagesStore.storeRetained(topic, toStoreMsg);
//            }
//        }
        //TODO this should happen on PUB_REL, else we notify false positive
        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

2. Отправить сообщение Отн.

  1. удаленное сообщение
  2. следующее сообщение
  3. Отправить ответ Comp клиенту-A
 void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 删除消息
        IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
        if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        final Topic topic = new Topic(evt.getTopic());

        // 2. 转发消息
        this.publisher.publish2Subscribers(evt, topic, messageID);

        if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                m_messagesStore.cleanRetained(topic);
            } else {
                m_messagesStore.storeRetained(topic, evt);
            }
        }

        //TODO here we should notify to the listeners
        //m_interceptor.notifyTopicPublished(msg, clientID, username);
        // 3.发送Comp 回应
        sendPubComp(clientID, messageID);
    }

3. Отправить сообщение в ответ на Rec

  1. удаленное сообщение
  2. Хранить сообщения (хранятся в secondPhaseStore и outboundInflightMap соответственно)
  3. отправить ПУБЛИЧНО
public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // remove from the inflight and move to the QoS2 second phase queue
        // 1. 删除消息
        StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
        // 2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap)
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
        // once received a PUBREC reply with a PUBREL(messageID)
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        // 3. 发送PUBREL
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
        channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
    }

3.4 Отправить сообщение в ответ на комп

  1. удаленное сообщение
public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
        // once received the PUBCOMP then remove the message from the temp memory
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 删除消息
        StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
            username, messageID);
        m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
    }

Topic & Subcribe

Основные понятия

Тема Тема и фильтр темы Фильтр темы

Основной механизм модели сообщений Pub-Sub Строка в кодировке UTF-8, не может превышать 65535 байт. Неограниченное количество уровней Не может содержать никаких специальных символов (/, +, #), упомянутых ниже, должен содержать хотя бы один символ
С учетом регистра, может содержать пробелы, не может содержать нулевые символы (Unicode U+0000)
Добавление косой черты "/" в конце или в конце приведет к созданию разных тем и фильтров тем. Пример:

  • "/А" и "А" разные
  • "А" и "А/" разные

Topic или TopicFilter, содержащие только косую черту "/", являются допустимыми.

Специальные символы в TopicFilter

разделитель уровня / Используется для разделения каждого уровня тем, обеспечивая иерархическую структуру для названий тем.
Разделители на уровне тем могут появляться в любом месте в Topic или TopicFilter.
Исключение: соседние разделители уровня темы указывают на уровень темы нулевой длины.

Подстановочный знак одного уровня +

Подстановочные знаки, которые можно использовать только для сопоставления на уровне одной темы. Например, «a/b/+» соответствует «a/b/c1» и «a/b/c2», но не «a/b/c/d».
Любой уровень может быть сопоставлен, включая первый и последний уровень.

Например, "+" допустимо, как и "sport/+/player1". Его можно использовать на нескольких уровнях или с несколькими уровнями подстановочных знаков.

Например, допустимо "+/теннис/#". Он может соответствовать только текущему уровню, но не высшему уровню.

Например, «спорт/+» соответствует не «спорту», ​​а «спорту/», а «/финансы» соответствует «+/+» и «/+», но не «+».

Многоуровневые подстановочные знаки #

Подстановочные знаки для соответствия любому уровню предмета Соответствует уровням и подуровням, которые содержат сами себя.

Например, «a/b/c/#» соответствует «a/b/c», «a/b/c/d» и «a/b/c/d/e». Должна быть последняя концовка.

Например, "спорт/теннис/#/рейтинг" недействителен.

"#" является действительным и будет получать все сообщения приложения. (Этот тип TopicFilter должен быть отключен на стороне сервера)

Начиная с $, зарезервировано сервером

Сервер не может сопоставить тему, начинающуюся с символа $, с фильтром темы, начинающимся с подстановочного знака (# или +).

Серверы ДОЛЖНЫ запрещать клиентам использовать эту тему для обмена сообщениями с другими клиентами.

Реализации сервера МОГУТ использовать имена разделов, начинающиеся с $, для других целей.

SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀 客户端不特意订阅Тема в начале, вы не получите соответствующее сообщение

  • Клиенты, подписанные на «#», не будут получать сообщения, опубликованные в темах, начинающихся с «$».
  • Клиенты, подписанные на "+/A/B", не будут получать сообщения, опубликованные на "$SYS/A/B".
  • Подписаться "SYS/#” 的客户端会收到发布到以 “SYS/" сообщение
  • Подписаться "SYS/A/+” 的客户端会收到发布到 “SYS/A/B" тема

Если клиент хочет принять оба "SYS/” 开头主题的消息和不以Сообщение в начале темы, нужно подписаться как на "#", так и на "$SYS/#"

структура хранения

  • a/b/c
  • a/a
  • a/haha
  • msg

Эти 4 темы будут храниться в следующей структуре:

  1. дети указывают на нижний узел
  2. подписки хранит всех подписчиков текущей темы

image

алгоритм поиска

подписка
@Override
    public void add(Subscription newSubscription) {
        Action res;
        do {
            res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
        } while (res == Action.REPEAT);
    }

    private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return insert(clientId, remainingTopic, nextInode, fullpath);
        } else {
            if (topic.isEmpty()) {
                return insertSubscription(clientId, fullpath, inode);
            } else {
                return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
            }
        }
    }
удалить подписку
public void removeSubscription(Topic topic, String clientID) {
        Action res;
        do {
            res = remove(clientID, topic, this.root, NO_PARENT);
        } while (res == Action.REPEAT);
    }

    private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return remove(clientId, remainingTopic, nextInode, inode);
        } else {
            final CNode cnode = inode.mainNode();
            if (cnode instanceof TNode) {
                // this inode is a tomb, has no clients and should be cleaned up
                // Because we implemented cleanTomb below, this should be rare, but possible
                // Consider calling cleanTomb here too
                return Action.OK;
            }
            if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
                // last client to leave this node, AND there are no downstream children, remove via TNode tomb
                if (inode == this.root) {
                    return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
                }
                TNode tnode = new TNode();
                return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
            } else if (cnode.contains(clientId) && topic.isEmpty()) {
                CNode updatedCnode = cnode.copy();
                updatedCnode.removeSubscriptionsFor(clientId);
                return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
            } else {
                //someone else already removed
                return Action.OK;
            }
        }
    }
найти
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
        CNode cnode = inode.mainNode();
        if (Token.MULTI.equals(cnode.token)) {
            return cnode.subscriptions;
        }
        if (topic.isEmpty()) {
            return Collections.emptySet();
        }
        if (cnode instanceof TNode) {
            return Collections.emptySet();
        }
        final Token token = topic.headToken();
        if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
            return Collections.emptySet();
        }
        Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
        Set<Subscription> subscriptions = new HashSet<>();
        if (remainingTopic.isEmpty()) {
            subscriptions.addAll(cnode.subscriptions);
        }
        for (INode subInode : cnode.allChildren()) {
            subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
        }
        return subscriptions;
    }

Хвостик

Связанные ссылки

Простое объяснение протокола MQTT