Немного о mqtt.js, любимце Интернета вещей

JavaScript
Немного о mqtt.js, любимце Интернета вещей

Распространенными mq являются Kafka, RocketMQ и RabbitMQ, которые также очень распространены. Первый очень распространен и относится к mq между микросервисами.

Так что же такое MQTT? MQTT относится к концепции IoT, то есть Интернету вещей.

Приходите и посмотрите на автора, который использует mqtt.js для разработки функций обмена мгновенными сообщениями в течение 2 лет~

Давайте сначала посмотрим на MQTT в сценарии полевых приложений IOT:

mqtt.js — это реализация MQTT на стороне nodejs.

Внешние интерфейсы в рамках современного стека технологий vue также доступны через управление пакетами npm package.json, например проекты, созданные с помощью vue-cli, create-react-app и т. д.

mqtt.js официально поддерживает апплет WeChat и апплет Alipay. Протокол MQTT апплета WeChat называется wxs, а апплета Alipay — alis.

Если вы все еще в замешательстве, следуйте за мной, чтобы познакомиться с этим любимцем Интернета вещей через mqtt.js.

  • Что такое очередь микросообщений?
  • Объяснение ключевых терминов MQTT
  • Обмен сообщениями P2P и сообщение Pub/Sub
  • Инкапсулированный общий класс mqtt.js
  • Функция отправки клиента sendPacket
  • Клиентское подключение mqtt.connect()
  • Подпишитесь на тему mqtt.Client#subscribe()
  • Отправить сообщение mqtt.Client#publish()
  • Получение сообщений mqtt.Client# событие "сообщение"

Что такое очередь микросообщений?

Обычно существует два типа очередей сообщений:

  • Очередь сообщений микросервиса (передача информации между микросервисами, типичные представители RabbitMQ, Kafka, RocketMQ)
  • Очередь сообщений IoT (Интернет вещей и облачный обмен сообщениями, представляющий MQTT)

В настоящее время то, что я практиковал, то есть углубленный анализ этого сообщения в блоге, — это mqtt.js очереди сообщений Интернета вещей.

Традиционная очередь сообщений (передача информации между микросервисами)

Традиционная очередь сообщений между микросервисами (серверами с несколькими подсистемами) — очень распространенный способ передачи сообщений между серверами.

Типичными представителями являются: RabbitMQ, Kafka, RocketMQ. На официальном веб-сайте Alibaba Cloud есть три очереди сообщений микросервиса: AMQP (совместимый с RabbitMQ), Kafka и RocketMQ, которые очень помогут нам в будущей реализации в реальных проектах.

Используйте различные сценарии:

  • Высокий параллелизм: всплески, захват билетов (FIFO)
  • Общий тип: обмен точками (несколько подсистем совместно используют модуль точек)
  • Тип связи: передача сообщений между серверами (nodejs, java, python, go и т. д.)

Очередь сообщений MQTT (обмен сообщениями между IoT и облаком)

MQTT — это протокол MQTT Интернета вещей, который в основном решает сложную проблему сети Интернета вещей IoT.

В Alibaba Cloud есть служба очереди сообщений MQTT. Протокол связи поддерживает MQTT, STOMP, GB-808 и т. д. Уровень передачи данных поддерживает длинное соединение TCP, шифрование SSL, Websocket и т. д.

Сценарии использования в основном связаны с передачей данных:

  1. Автомобильная сеть (дистанционное управление, загрузка данных автомобиля)
  2. Мгновенное общение (одиночный чат, круг друзей 1-ко-многим)
  3. Живое видео (уведомление о заграждении, взаимодействие в чате)
  4. Умный дом (загрузка данных устройства, команды дистанционного управления)

В настоящее время используется система чата, за которую я отвечаю уже 2 года.设备<->server<->PCПуть,Протокол MQTT, транспортный протокол WebsocketВыполните обмен данными между устройством и ПК.

Объяснение ключевых терминов MQTT

Пример

Каждый экземпляр MQTT соответствует глобально уникальной точке доступа к сервису. Видимая разница черезmqtt.connect(url)При установлении соединения с сервером (брокером) URL-адрес брокера тот же. Предполагая, что есть продавец1, продавец2... их локальные интерфейсные и серверные URL-адреса соединений унифицированы, просто различайте их по идентификатору клиента.

Идентификатор клиента (идентификатор клиента)

Идентификатор клиента MQTT — это уникальный идентификатор каждого клиента, который должен быть уникальным в глобальном масштабе, и соединения, использующие тот же идентификатор клиента, будут отклонены. ClientID Alibaba Cloud состоит из двух частей. <GroupID>@@@<DeviceID>. Обычно идентификатор группы унифицируется для нескольких внешних интерфейсов, таких как ПК, мобильные устройства Android, мобильные устройства iOS, и DeviceID также унифицируется для нескольких внешних интерфейсов. Так как же отличить мультитерминал? Вы можете изменить @@@ в середине идентификатора клиента. Например:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`

Идентификатор группы (идентификатор группы)

Он используется для указания общего группового имени группы узлов с одинаковыми логическими функциями, представляющих класс устройств с одинаковой функцией.

Device ID

Уникальная идентификация для каждого устройства. Это должно быть глобально уникальным, что может быть серийным номером каждого сенсорного устройства или идентификатором пользователя ПК для входа в систему.

Родительская тема

Протокол MQTT основан на модели Pub/Sub, и любое сообщение относится к теме. Тема может существовать на нескольких уровнях, первый уровень является родительской темой. Консоль нужно создавать отдельно.

Подтема

MQTT может иметь тему второго уровня или тему третьего уровня. Не нужно создавать, просто пропишите это прямо в коде.

Сообщения P2P и сообщения Pub/Sub

Сообщение Pub/Sub — это режим подписки и публикации, аналогичный мониторингу событий и трансляции. Если вы не понимаете, что такое публикация и подписка, вы можете прочитатьЧто, черт возьми, такое вебхук?Помимо поддержки режима Pub/Sub, MQTT также поддерживает режим P2P.

Что такое P2P-сообщения?

  • P2P, полное название (точка-точка).
  • Индивидуальный режим обмена сообщениями, только с одним отправителем сообщения и одним получателем сообщения.
  • В режиме P2P отправитель сообщения четко знает предполагаемого получателя сообщения, и доступ к сообщению может получить только этот конкретный получатель.Потребление клиентов.
  • Когда отправитель отправляет сообщение, он указывает получателя через тему, и получатель может получить сообщение без подписки.
  • Режим P2P не только снижает стоимость регистрации и подписки, но и уменьшает задержку отправки, поскольку ссылка оптимизирована.

Разница между режимом P2P и режимом Pub/Sub

при отправке сообщения

  • В режиме Pub/Sub отправителю необходимо отправлять сообщения в соответствии с темой, согласованной с получателем.
  • В режиме P2P отправителю не нужно отправлять в соответствии с Tpic, и он может отправлять напрямую в соответствии со спецификацией.

При получении сообщения

  • В режиме Pub/Sub получателям необходимо заранее подписаться на темы, чтобы получать сообщения.
  • Получайте сообщения без подписки в режиме P2P

nodejs отправляет сообщение P2P

const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);

Инкапсулированный общий класс mqtt.js

  • Клиентское подключение initClient(config)
  • Подписаться на тему subscribeTopic(topic, config)
  • Отправить сообщение publishMessage(сообщение)
  • Получить сообщение handleMessage (обратный вызов)
import mqtt from 'mqtt';
import config from '@/config';

export default class MQTT {
  constructor(options) {
    this.name = options.name;
    this.connecting = false;
  }
  /**
   * 客户端连接
   */
  initClient(config) {
    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
    return new Promise((resolve) => {
      this.client = mqtt.connect(
        {
          url,
          clientId: `${groupId}@@@${deviceId}`,
          username: key,
          password,
        }
      );
      this.client.on('connect', () => {
        this.connecting = true;
        resolve(this);
      });
    });
  }

  /**
   * 订阅topic
   */
  subscribeTopic(topic, config) {
    if (this.connecting) {
      this.client.subscribe(topic, config);
    }
    return this;
  }

  /**
   * 发送消息
   */
  publishMessage(message) {
    this.client.publish(publishTopic, message, { qos: 1 });
  }

  /**
   * 接收消息
   */
  handleMessage(callback) {
    if (!this.client._events.message) {
      this.client.on('message', callback);
    }
  }

}

Функция отправки клиента sendPacket

mqtt-packet создает переносимый буфер

var mqtt = require('mqtt-packet')
var object = {
  cmd: 'publish',
  retain: false,
  qos: 0,
  dup: false,
  length: 10,
  topic: 'test',
  payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet

console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
//   48, 10, // Header (publish)
//   0, 4, // Topic length
//   116, 101, 115, 116, // Topic (test)
//   116, 101, 115, 116 // Payload (test)
// ])

функция sendPacket

Генерируется событие packetsend, и пакет записывается в поток клиента через mqtt.writeToStream.

var mqttPacket = require('mqtt-packet')

function sendPacket (client, packet) {
  client.emit('packetsend', packet)
  mqttPacket.writeToStream(packet, client.stream, client.options)
}

_sendPack метод

MqttClient.prototype._sendPacket = function (packet) {
     sendPacket(this, packet);
}

Клиентское подключение mqtt.connect()

Клиент mqtt устанавливает соединение с сервером mqtt (брокером), обычно передавая «mqtt», «mqtts», «tcp», «tls», «ws», «wss», «wxs», «alis» в качестве URL-адрес протокола для подключения.

mqtt.connect([url], options)

Официальное описание:

  • Подключитесь к брокеру с заданным URL-адресом и конфигурацией и верните клиент.
  • URL-адрес может следовать следующим протоколам: «mqtt», «mqtts», «tcp», «tls», «ws», «wss», «wxs», «alis». (mqtt.js поддерживает апплет WeChat и апплет Alipay, протоколы wxs и alis соответственно.)
  • url также может быть объектом, возвращаемым функцией URL.parse().
  • Можно передать один объект, содержащий как URL-адрес, так и параметры.

Давайте посмотрим на конфигурацию подключения проекта в моей руке и результат подключения. Конфиденциальная информация была десенсибилизирована комбинацией foo, bar, baz или xxxx.

Конфигурация подключения

 {
    key: 'xxxxxxxx',
    secret: 'xxxxxxxx',
    url: 'wss://foo-bar.mqtt.baz.com/mqtt',
    groupId: 'FOO_BAR_BAZ_GID',
    topic: {
      publish: 'PUBLISH_TOPIC',
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
    },
}
  • ключевой счет
  • секретный пароль
  • url — это ссылка, используемая для установления соединения между клиентом и сервером (брокером) mqtt
  • идентификатор группы
  • тема Отправить сообщение в тему, подписаться на тему, отписаться в теме

результат соединения

Включает обзор, заголовки ответов и заголовки запросов.

General
Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols
Response Header
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
Host: foo-bar.mqtt.baz.com
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
Upgrade: websocket
Origin: https://xxx.xxx.com
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
Sec-WebSocket-Key: xxxxxxxxx
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Protocol: mqtt
Анализ исходного кода

Давайте посмотрим на код этого соединения mqtt.

this.client = mqtt.connect(
  {
    url,
    clientId: `${groupId}@@@${deviceId}`,
    username: key,
    password,
  }
);
function parseAuthOptions (opts) {
  var matches
  if (opts.auth) {
    matches = opts.auth.match(/^(.+):(.+)$/)
    if (matches) {
      opts.username = matches[1]
      opts.password = matches[2]
    } else {
      opts.username = opts.auth
    }
  }
}
/**
 * connect - connect to an MQTT broker.
 *
 * @param {String} [brokerUrl] - url of the broker, optional
 * @param {Object} opts - see MqttClient#constructor
 */
function connect (brokerUrl, opts) {
  if ((typeof brokerUrl === 'object') && !opts) {
    //  可以传入一个单对象,既包含url又包含选项
    opts = brokerUrl
    brokerUrl = null
  }
  opts = opts || {}
  // 设置username和password
  parseAuthOptions(opts)
  if (opts.query && typeof opts.query.clientId === 'string') {
    // 设置Client Id
    opts.clientId = opts.query.clientId
  }
  function wrapper (client) {
   ...
    return protocols[opts.protocol](client, opts)
  }
  // 最终返回一个mqtt client实例
  return new MqttClient(wrapper, opts)
}

Подпишитесь на тему mqtt.Client#subscribe()

фактический код

const topic =  {
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
};
const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)

Анализ исходного кода

MqttClient.prototype.subscribe = function () {
  var packet
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
   // obj为订阅的topic列表
  var obj = args.shift()
  // qos等配置
  var opts = args.pop()
  var defaultOpts = {
    qos: 0
  }
  opts = xtend(defaultOpts, opts)
  // 数组类型的订阅的topic列表  
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
        that._resubscribeTopics[topic].qos < opts.qos ||
          resubscribe) {
        var currentOpts = {
          topic: topic,
          qos: opts.qos
        }
        // subs是最终的订阅的topic列表
        subs.push(currentOpts)
      }
    })
  }
  // 这个packet很重要
  packet = {
    // 发出订阅命令
    cmd: 'subscribe',
    subscriptions: subs,
    qos: 1,
    retain: false,
    dup: false,
    messageId: this._nextId()
  }
  // 发出订阅包
  this._sendPacket(packet)
  return this
}

Отправить сообщение mqtt.Client#publish()

фактический код

const topic = {
      publish: 'PUBLISH_TOPIC',
};
const messge = {
   foo: '',
   bar: '',
   baz: '',
   ...
}
const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);

Обратите внимание, что сообщение публикации необходимо сериализовать с помощью JSON.stringify, а затем отправить в указанную тему.

Анализ исходного кода

MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  var defaultOpts = {qos: 0, retain: false, dup: false}
  opts = xtend(defaultOpts, opts)

  // 将消息传入packet的payload
  packet = {
    cmd: 'publish',
    topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId(),
    dup: opts.dup
  }
  // 处理不同qos
  switch (opts.qos) {
    case 1:
    case 2:
       // 发出publish packet
       this._sendPacketI(packet);
        ...
    default:
       this._sendPacket(packet);
        ...
  }
  return this
}

Получение сообщения mqtt.Client событие "сообщение"

фактический код

this.client.on('message', callback);

Данные принимаются в виде обратного вызова.

function (topic, message, packet) {}

Тема представляет собой полученную тему, а буфер — это конкретные данные. message — это полученные данные, не забудьте проанализировать буфер с помощью JSON.parse().

handleMessage(callback) {
    this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) => {
  let receiveMsg = null;
  try {
   receiveMsg = JSON.parse(buffer.toString());
  } catch (e) {
   receiveMsg = null;
  }
  if (!receiveMsg) {
    return;
  }
  ...do something with receiveMsg...
});

Анализ исходного кода

MqttClient наследует EventEmitter. Так что вы можете использовать для прослушивания события «сообщение».

inherits(MqttClient, EventEmitter)

Итак, где именно генерируется событие сообщения? >отправить событие сообщения

  1. Установите соединение через веб-сокет на основе потока веб-сокетов.
  2. Используйте канал для подключения доступных для записи потоков, созданных на основе readable-stream.Writable
  3. nextTick вызывает _handlePacket
  4. Вызовите handlePublish в handlePacket, чтобы выдать событие сообщения
1. Установите соединение через веб-сокет на основе потока веб-сокетов.
this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
  return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'

  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
2. Используйте канал для подключения потока с возможностью записи, созданного на основе потока для чтения.
var Writable = require('readable-stream').Writable
var writable = new Writable();
this.stream.pipe(writable);
3. nextTick вызывает _handlePacket
writable._write = function (buf, enc, done) {
    completeParse = done
    parser.parse(buf)
    work()
}
function work () {
    var packet = packets.shift()
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    }
}
function nextTickWork () {
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
}
4. Вызовите handlePublish в handlePacket, чтобы создать событие сообщения.
MqttClient.prototype._handlePacket = function (packet, done) {
  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done)
      break
   ...
}
// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
  switch (qos) {
    case 1: {
      // emit the message event
        if (!code) { that.emit('message', topic, message, packet) }
    }
}

Использованная литература:

Я с нетерпением жду возможности пообщаться с вами и вместе добиться прогресса. Приглашаем вас присоединиться к созданной мной технической дискуссионной группе, тесно связанной с фронтенд-разработкой:

Стремитесь стать отличным front-end инженером!