Выпущен mica-mqtt 1.1.2, реализующий кластер mqtt на основе redis pub/sub

Java Redis задняя часть Интернет вещей
Выпущен mica-mqtt 1.1.2, реализующий кластер mqtt на основе redis pub/sub

1. Введение

mica-mqttна основеt-ioосуществленныйПростой,низкая задержка,высокая производительностьКомпонент mqtt IoT с открытым исходным кодом.Подробности см. в исходном коде mica-mqtt gitee. пример модуля mica-mqtt.

После того, как несколько друзей спросили о кластере mica-mqtt, я добавил модуль mica-mqtt-broker, чтобы продемонстрировать реализацию кластера на основе redis pub/sub.

2. Функция

  • Поддерживает протоколы MQTT v3.1, v3.1.1 и v5.0.
  • Поддерживает подпротокол websocket mqtt (поддерживает mqtt.js).
  • поддержка http rest API,Подробнее см. в документации по http API..
  • Поддержка клиента MQTT.
  • Сервер сервера поддержки MQTT.
  • Поддерживает сообщения MQTT.
  • Поддерживает постоянные сообщения MQTT.
  • Поддерживает обработку и пересылку пользовательских сообщений (mq) для реализации кластеров.
  • Демонстрация подключения MQTT-клиента Alibaba Cloud mqtt.
  • Поддерживает GraalVM для компиляции в собственные исполняемые файлы.
  • Поддержка быстрого доступа к загрузочному проекту Spring (mica-mqtt-spring-boot-starter).
  • mica-mqtt-spring-boot-starter поддерживает стыковку с Prometheus + Grafana.
  • Реализовать кластер на основе redis pub/sub, см. подробностимодуль mica-mqtt-брокер.

3. Что нужно сделать

  • Оптимизирована обработка сеансов mqtt и поддерживаются некоторые новые функции mqtt v5.0.

4. Обновление записей

  • ✨ Добавьте модуль mica-mqtt-broker для реализации кластера mqtt на основе redis pub/sub.
  • ✨mica-mqtt-broker реализует хранилище состояния клиента на основе Redis.
  • ✨mica-mqtt-broker реализует зарезервированное хранилище сообщений на основе Redis.
  • ✨ http API mqtt-server настраивает подписку и отмену подписки для облегчения обработки кластера.
  • ✨mica-mqtt-spring-boot-example добавляет примеры аутентификации mqtt и http API.
  • ✨ Добавьте mqtt 5 во все ReasonCode.
  • ✨Оптимизированный расчет PacketNeededLength для декодирования.
  • 🐛 Исправьте сообщение, добавьте тип сообщения.
  • 🐛 Исправьте mqtt-сервер, сохраняющий правила сопоставления сообщений.

Пять, быстрый доступ к Spring boot

5.1 Добавьте зависимости

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-spring-boot-starter</artifactId>
    <version>1.1.2</version>
</dependency>

5.2 Пример конфигурации сервера

mqtt:
  server:
    enabled: true               # 是否开启,默认:true
    ip: 127.0.0.1               # 服务端 ip 默认:127.0.0.1
    port: 5883                  # 端口,默认:1883
    name: Mica-Mqtt-Server      # 名称,默认:Mica-Mqtt-Server
    buffer-allocator: HEAP      # 堆内存和堆外内存,默认:堆内存
    heartbeat-timeout: 120000   # 心跳超时,单位毫秒,默认: 1000 * 120
    read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092
    max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092
    debug: true                 # 如果开启 prometheus 指标收集建议关闭
    websocket-enable: true      # 开启 websocket 子协议,默认开启
    websocket-port: 8083        # websocket 端口,默认:8083

5.3 Сервер может реализовать интерфейс (зарегистрироваться как Spring Bean)

интерфейс Это необходимо инструкция
IMqttServerAuthHandler да для аутентификации клиента
IMqttMessageListener да мониторинг сообщений
IMqttConnectStatusListener да монитор состояния соединения
IMqttSessionManager нет управление сессиями
IMqttMessageStore Кластер Да, Автономный Нет Хранение завещаний и сообщений об удержании
AbstractMqttMessageDispatcher Кластер Да, Автономный Нет Переадресация сообщений (будет, переадресация сообщений об удержании)
IpStatListener нет мониторинг IP-статуса t-io

5.4 Мониторинг Prometheus + Grafana

выгода отt-ioХороший дизайн, прямое подключение индикаторов мониторингаt-iostat, в настоящее время поддерживает следующие индикаторы, которые будут постоянно улучшаться в будущем.

Поддерживались индексы инструкция
mqtt_connections_accepted Общее количество принятых подключений
mqtt_connections_closed Количество закрытых соединений
mqtt_connections_size Текущее количество подключений
mqtt_messages_handled_packets Количество обработанных сообщений
mqtt_messages_handled_bytes Байты сообщения обработаны
mqtt_messages_received_packets Количество полученных сообщений
mqtt_messages_received_bytes Байты сообщения обработаны
mqtt_messages_send_packets Количество отправленных сообщений
mqtt_messages_send_bytes Байты сообщения отправлены

mqtt监控1.jpg

оmica-mqtt-spring-boot-starterДля получения дополнительной информации, пожалуйста, проверьте документацию:git ee.com/596392912/ нет…

6. Общий доступ к Java-проекту

6.1 maven-зависимости

 <dependency>
   <groupId>net.dreamlu</groupId>
   <artifactId>mica-mqtt-core</artifactId>
   <version>1.1.2</version>
 </dependency>

6.2 клиент mica-mqtt

 // 初始化 mqtt 客户端
 MqttClient client = MqttClient.create()
     .ip("127.0.0.1")
     .port(1883)                     // 默认:1883
     .username("admin")
     .password("123456")
     .version(MqttVersion.MQTT_5)    // 默认:3_1_1
     .clientId("xxxxxx")             // 默认:MICA-MQTT- 前缀和 36进制的纳秒数
     .connect();                     // 连接
 
     // 消息订阅,同类方法 subxxx
     client.subQos0("/test/#", (topic, payload) -> {
         logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
     });
     // 取消订阅
     client.unSubscribe("/test/#");
 
     // 发送消息
     client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
 
     // 断开连接
     client.disconnect();
     // 重连
     client.reconnect();
     // 停止
     client.stop();

6.3 сервер mica-mqtt

 // 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
 MqttServer mqttServer = MqttServer.create()
     // 默认:127.0.0.1
     .ip("127.0.0.1")
     // 默认:1883
     .port(1883)
     // 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
     .readBufferSize(512)
     // 自定义认证
     .authHandler((clientId, userName, password) -> true)
     // 消息监听
     .messageListener((clientId, topic, mqttQoS, payload) -> {
         logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
     })
     .debug() // 开启 t-io debug 信息日志
     .start();
 
 // 发送给某个客户端
 mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
 
 // 发送给所有在线监听这个 topic 的客户端
 mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
 
 // 停止服务
 mqttServer.stop();

7. Демонстрация кластера

mica-mqtt集群.gif

8. Следуй за мной

обрати внимание натехнология мечтыДругие замечательные статьи в колонке Nuggets ждут вас.