RabbitMQ настолько прост, что обеспечивает мгновенное общение! Вам вообще нужно писать внутренний код?

Java RabbitMQ
RabbitMQ настолько прост, что обеспечивает мгновенное общение! Вам вообще нужно писать внутренний код?

Адрес фактического центра электронной коммерции SpringBoot (40k+star):GitHub.com/macro-positive/…

Резюме

Иногда мы будем использовать его в наших проектах.即时通讯Функции, такие как функция чата обслуживания клиентов в системе электронной коммерции и в процессе оплаты, когда пользователь успешно платит, сторонняя платежная служба перезвонит в наш интерфейс обратного вызова. В это время нам необходимо уведомить фронт-энд, что платеж прошел успешно. Недавно я обнаружил, что RabbitMQ можно легко реализовать.即时通讯Функция, если у вас нет особых бизнес-требований, вы можете даже не писать back-end код, Сегодня я расскажу вам, как использовать RabbitMQ для ее достижения.即时通讯!

протокол MQTT

MQTT (транспорт телеметрии очереди сообщений, протокол передачи телеметрии очереди сообщений) — это модель публикации/подписки, основанная на轻量级протокол связи, построенный наTCP/IPна договоре. Самым большим преимуществом MQTT является то, что он может предоставлять надежные службы сообщений в режиме реального времени для подключения удаленных устройств с очень небольшим количеством кода и ограниченной пропускной способностью.

Понятия, связанные с MQTT

  • Издатель (publisher): отправитель сообщения, ответственный за отправку сообщения.
  • Подписчик: Подписчик сообщения, ответственный за получение и обработку сообщения.
  • Broker (брокер): Брокер сообщений находится между издателем сообщений и подписчиком, и в качестве него могут выступать всевозможные промежуточные программы сообщений, поддерживающие протокол MQTT.
  • Тема: это можно понимать как маршрут в очереди сообщений.После подписки на тему подписчик может получать сообщения, отправленные в тему.
  • Полезная нагрузка (load); можно понимать как содержимое отправленного сообщения.
  • QoS (качество сообщения): Полное название Quality of Service, то есть качество отправки сообщения, в основном включающееQoS 0,QoS 1,QoS 2Эти три уровня описаны ниже:
    • QoS 0 (почти один раз): не более одного раза, отправка только один раз, произойдет потеря или повторение сообщения;
    • QoS 1 (по крайней мере один раз): по крайней мере один раз, чтобы гарантировать получение сообщения, но может произойти дублирование сообщения;
    • QoS 2 (точно один раз): только один раз, гарантируя, что сообщение будет доставлено только один раз.

RabbitMQ обеспечивает функциональность MQTT

Чтобы включить функцию MQTT в RabbitMQ, вам необходимо сначала установить RabbitMQ, а затем включить подключаемый модуль MQTT.

  • Прежде всего, нам нужно установить и запустить RabbitMQ, друзья, которые не знают о RabbitMQ, могут обратиться к«Практические советы по RabbitMQ подытожили за 3 дня, с чем-то! 》;

  • Следующим шагом является включение подключаемого модуля MQTT RabbitMQ. По умолчанию он не включен. Используйте следующую команду, чтобы включить его;

rabbitmq-plugins enable rabbitmq_mqtt
  • После успешного открытия проверяем консоль управления, мы можем обнаружить, что служба MQTT запущена в1883порт встал.

MQTT-клиент

Мы можем использовать клиент MQTT для тестирования функции обмена мгновенными сообщениями MQTT, которая используется здесь.MQTTBoxэтот клиентский инструмент.

  • нажмитеCreate MQTT Clientкнопка для создания клиента MQTT;

  • Затем настройте клиент MQTT, в основном для настройки порта протокола, имени пользователя и пароля для подключения и QoS;

  • Настройте другого подписчика, подписчик подписываетсяtestTopicAэта тема, мы отправим сообщение в эту тему;

  • Издатели публикуют сообщения в темы, а подписчики могут получать их в режиме реального времени.

Внешний интерфейс напрямую реализует обмен мгновенными сообщениями

теперь, когдаMQTTBoxКлиент может реализовать мгновенную связь непосредственно через RabbitMQ, поэтому можем ли мы также реализовать мгновенную связь напрямую с помощью технологии внешнего интерфейса? Ответ - да! Ниже мы пройдемhtml+javascriptРеализуйте простую функцию чата и реализуйте обмен мгновенными сообщениями, не написав ни строчки внутреннего кода!

  • Поскольку нижним уровнем RabbitMQ, взаимодействующим с веб-частью, является WebSocket, поэтому нам нужно включить поддержку RabbitMQ MQTT WEB, используйте следующую команду, чтобы включить ее;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • После того, как открытие прошло успешно, проверяем консоль управления, мы можем обнаружить, что служба MQTT WEB запущена в15675порт поднят;

  • Функция реализована очень просто, одна функция чата, следует отметить, что адрес доступа настроенного сервиса MQTT: ws://localhost:15675/ws
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<div>
    <label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
    <label>发送消息:<input id="messageInput" type="text"></label><br>
    <button onclick="sendMessage()">发送</button>
    <button onclick="clearMessage()">清空</button>
    <div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    //RabbitMQ的web-mqtt连接地址
    const url = 'ws://localhost:15675/ws';
    //获取订阅的topic
    const topic = getQueryString("topic");
    //连接到消息队列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //连接成功后订阅topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("订阅topic:" + topic + "成功!");
            }
        });
    });
    //获取订阅topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:" + message.toString());
    });

    //发送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目标topic中发送消息
        client.publish(targetTopic, message);
        showMessage("发送消息给" + targetTopic + "的消息:" + message);
    }

    //从URL中获取参数
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>
  • Далее подписываемся на разные темы, чтобы открыть две страницы для тестирования функции (страница помещается в директорию ресурсов приложения SpringBoot, и перед доступом приложение нужно запустить):

  • После отправки сообщений друг другу посмотрим на эффект!

Использование в SpringBoot

Когда нет особых бизнес-требований, внешний интерфейс может напрямую подключаться к RabbitMQ для реализации мгновенной связи. Но иногда нам нужно оповестить фронтенд через сервер.В это время нам нужно интегрировать MQTT в приложение.Далее поговорим о том, как использовать MQTT в приложении SpringBoot.

  • Сначала нам нужноpom.xmlДобавьте зависимости, связанные с MQTT, в ;
<!--Spring集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  • существуетapplication.ymlДобавить конфигурацию, связанную с MQTT, в основном адрес доступа, имя пользователя и пароль, информацию о теме по умолчанию;
rabbitmq:
  mqtt:
    url: tcp://localhost:1883
    username: guest
    password: guest
    defaultTopic: testTopic
  • Напишите класс конфигурации Java для чтения конфигурации из файла конфигурации для удобства использования;
/**
 * MQTT相关配置
 * Created by macro on 2020/9/15.
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    /**
     * RabbitMQ连接用户名
     */
    private String username;
    /**
     * RabbitMQ连接密码
     */
    private String password;
    /**
     * RabbitMQ的MQTT默认topic
     */
    private String defaultTopic;
    /**
     * RabbitMQ的MQTT连接地址
     */
    private String url;
}
  • Добавьте конфигурацию, связанную с подписчиком сообщений MQTT, используйте@ServiceActivatorАннотация объявляет активатор службы черезMessageHandlerобрабатывать сообщения подписки;
/**
 * MQTT消息订阅者相关配置
 * Created by macro on 2020/9/15.
 */
@Slf4j
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置消息质量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理订阅消息
                log.info("handleMessage : {}",message.getPayload());
            }

        };
    }
}
  • Добавить конфигурацию, связанную с публикатором сообщений MQTT;
/**
 * MQTT消息发布者相关配置
 * Created by macro on 2020/9/15.
 */
@Configuration
public class MqttOutboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
  • Добавить шлюз MQTT для отправки сообщений в темы;
/**
 * MQTT网关,通过接口将数据传递到集成流
 * Created by macro on 2020/9/15.
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 发送消息到默认topic
     */
    void sendToMqtt(String payload);

    /**
     * 发送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 发送消息到指定topic并设置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
  • Добавить тестовый интерфейс MQTT, использовать шлюз MQTT для отправки сообщений в определенные темы;
/**
 * MQTT测试接口
 * Created by macro on 2020/9/15.
 */
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    @ApiOperation("向默认主题发送消息")
    public CommonResult sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
        return CommonResult.success(null);
    }

    @PostMapping("/sendToTopic")
    @ApiOperation("向指定主题发送消息")
    public CommonResult sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
        return CommonResult.success(null);
    }
}
  • Вызов интерфейса для отправки сообщения в тему для тестирования;

  • Сообщение успешно получено и напечатано в фоновом режиме.
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息

Суммировать

Применение промежуточного программного обеспечения сообщений становится все более и более обширным.Он может не только реализовать надежную асинхронную связь, но и реализовать мгновенную связь.Необходимо освоить промежуточное программное обеспечение сообщений. Если нет особых бизнес-требований, клиент или внешний интерфейс могут напрямую использовать MQTT для подключения к промежуточному программному обеспечению сообщений для реализации мгновенной связи.При наличии особых потребностей это также можно реализовать путем интеграции MQTT с SpringBoot.Короче говоря, ПО промежуточного слоя сообщений — хороший способ реализовать мгновенную связь.

Адрес исходного кода проекта

GitHub.com/macro-positive/…

Эта статьяGitHubGitHub.com/macro-positive/…Записано, приветствую всех на Star!