Адрес фактического центра электронной коммерции SpringBoot (40k+star):GitHub.com/macro-positive/…
Резюме
Иногда мы будем использовать его в наших проектах.即时通讯
Функции, такие как функция чата обслуживания клиентов в системе электронной коммерции и в процессе оплаты, когда пользователь успешно платит, сторонняя платежная служба перезвонит в наш интерфейс обратного вызова. В это время нам необходимо уведомить фронт-энд, что платеж прошел успешно. Недавно я обнаружил, что RabbitMQ можно легко реализовать.即时通讯
Функция, если у вас нет особых бизнес-требований, вы можете даже не писать back-end код, Сегодня я расскажу вам, как использовать RabbitMQ для ее достижения.即时通讯
!
протокол MQTT
MQTT (транспорт телеметрии очереди сообщений, протокол передачи телеметрии очереди сообщений) — это модель публикации/подписки, основанная на轻量级
протокол связи, построенный наTCP/IP
на договоре. Самым большим преимуществом MQTT является то, что он может предоставлять надежные службы сообщений в режиме реального времени для подключения удаленных устройств с очень небольшим количеством кода и ограниченной пропускной способностью.
Понятия, связанные с MQTT
- Издатель (publisher): отправитель сообщения, ответственный за отправку сообщения.
- Подписчик: Подписчик сообщения, ответственный за получение и обработку сообщения.
- Broker (брокер): Брокер сообщений находится между издателем сообщений и подписчиком, и в качестве него могут выступать всевозможные промежуточные программы сообщений, поддерживающие протокол MQTT.
- Тема: это можно понимать как маршрут в очереди сообщений.После подписки на тему подписчик может получать сообщения, отправленные в тему.
- Полезная нагрузка (load); можно понимать как содержимое отправленного сообщения.
- QoS (качество сообщения): Полное название Quality of Service, то есть качество отправки сообщения, в основном включающее
QoS 0
,QoS 1
,QoS 2
Эти три уровня описаны ниже:- QoS 0 (почти один раз): не более одного раза, отправка только один раз, произойдет потеря или повторение сообщения;
- QoS 1 (по крайней мере один раз): по крайней мере один раз, чтобы гарантировать получение сообщения, но может произойти дублирование сообщения;
- QoS 2 (точно один раз): только один раз, гарантируя, что сообщение будет доставлено только один раз.
RabbitMQ обеспечивает функциональность MQTT
Чтобы включить функцию MQTT в RabbitMQ, вам необходимо сначала установить RabbitMQ, а затем включить подключаемый модуль MQTT.
-
Прежде всего, нам нужно установить и запустить RabbitMQ, друзья, которые не знают о RabbitMQ, могут обратиться к«Практические советы по RabbitMQ подытожили за 3 дня, с чем-то! 》;
-
Следующим шагом является включение подключаемого модуля MQTT RabbitMQ. По умолчанию он не включен. Используйте следующую команду, чтобы включить его;
rabbitmq-plugins enable rabbitmq_mqtt
- После успешного открытия проверяем консоль управления, мы можем обнаружить, что служба MQTT запущена в
1883
порт встал.
MQTT-клиент
Мы можем использовать клиент MQTT для тестирования функции обмена мгновенными сообщениями MQTT, которая используется здесь.
MQTTBox
этот клиентский инструмент.
- Сначала скачайте и установите
MQTTBox
,ссылка для скачивания:работает с Web.com/матерями, эксплуатируемыми каждый день.htm…
- нажмите
Create MQTT Client
кнопка для создания клиента MQTT;
- Затем настройте клиент MQTT, в основном для настройки порта протокола, имени пользователя и пароля для подключения и QoS;
- Настройте другого подписчика, подписчик подписывается
testTopicA
эта тема, мы отправим сообщение в эту тему;
- Издатели публикуют сообщения в темы, а подписчики могут получать их в режиме реального времени.
Внешний интерфейс напрямую реализует обмен мгновенными сообщениями
теперь, когда
MQTTBox
Клиент может реализовать мгновенную связь непосредственно через RabbitMQ, поэтому можем ли мы также реализовать мгновенную связь напрямую с помощью технологии внешнего интерфейса? Ответ - да! Ниже мы пройдемhtml+javascript
Реализуйте простую функцию чата и реализуйте обмен мгновенными сообщениями, не написав ни строчки внутреннего кода!
- Поскольку нижним уровнем RabbitMQ, взаимодействующим с веб-частью, является WebSocket, поэтому нам нужно включить поддержку RabbitMQ MQTT WEB, используйте следующую команду, чтобы включить ее;
rabbitmq-plugins enable rabbitmq_web_mqtt
- После того, как открытие прошло успешно, проверяем консоль управления, мы можем обнаружить, что служба MQTT WEB запущена в
15675
порт поднят;
- Связь между WEB-стороной и службой MQTT требует использования
MQTT.js
Библиотека, адрес проекта:GitHub.com/Финалы группы гражданских прав/MQ TT…
- Функция реализована очень просто, одна функция чата, следует отметить, что адрес доступа настроенного сервиса MQTT: ws://localhost:15675/ws
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
<label>发送消息:<input id="messageInput" type="text"></label><br>
<button onclick="sendMessage()">发送</button>
<button onclick="clearMessage()">清空</button>
<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
//RabbitMQ的web-mqtt连接地址
const url = 'ws://localhost:15675/ws';
//获取订阅的topic
const topic = getQueryString("topic");
//连接到消息队列
let client = mqtt.connect(url);
client.on('connect', function () {
//连接成功后订阅topic
client.subscribe(topic, function (err) {
if (!err) {
showMessage("订阅topic:" + topic + "成功!");
}
});
});
//获取订阅topic中的消息
client.on('message', function (topic, message) {
showMessage("收到消息:" + message.toString());
});
//发送消息
function sendMessage() {
let targetTopic = document.getElementById("targetTopicInput").value;
let message = document.getElementById("messageInput").value;
//向目标topic中发送消息
client.publish(targetTopic, message);
showMessage("发送消息给" + targetTopic + "的消息:" + message);
}
//从URL中获取参数
function getQueryString(name) {
let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
let r = window.location.search.substr(1).match(reg);
if (r != null) {
return decodeURIComponent(r[2]);
}
return null;
}
//在消息列表中展示消息
function showMessage(message) {
let messageDiv = document.getElementById("messageDiv");
let messageEle = document.createElement("div");
messageEle.innerText = message;
messageDiv.appendChild(messageEle);
}
//清空消息列表
function clearMessage() {
let messageDiv = document.getElementById("messageDiv");
messageDiv.innerHTML = "";
}
</script>
</html>
-
Далее подписываемся на разные темы, чтобы открыть две страницы для тестирования функции (страница помещается в директорию ресурсов приложения SpringBoot, и перед доступом приложение нужно запустить):
- первая тема подписки
testTopicA
,адрес:http://localhost:8088/page/index?topic=testTopicA - Вторая тема подписки
testTopicB
,адрес:http://localhost:8088/page/index?topic=testTopicB
- первая тема подписки
-
После отправки сообщений друг другу посмотрим на эффект!
Использование в SpringBoot
Когда нет особых бизнес-требований, внешний интерфейс может напрямую подключаться к RabbitMQ для реализации мгновенной связи. Но иногда нам нужно оповестить фронтенд через сервер.В это время нам нужно интегрировать MQTT в приложение.Далее поговорим о том, как использовать MQTT в приложении SpringBoot.
- Сначала нам нужно
pom.xml
Добавьте зависимости, связанные с MQTT, в ;
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
- существует
application.yml
Добавить конфигурацию, связанную с MQTT, в основном адрес доступа, имя пользователя и пароль, информацию о теме по умолчанию;
rabbitmq:
mqtt:
url: tcp://localhost:1883
username: guest
password: guest
defaultTopic: testTopic
- Напишите класс конфигурации Java для чтения конфигурации из файла конфигурации для удобства использования;
/**
* MQTT相关配置
* Created by macro on 2020/9/15.
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ连接用户名
*/
private String username;
/**
* RabbitMQ连接密码
*/
private String password;
/**
* RabbitMQ的MQTT默认topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT连接地址
*/
private String url;
}
- Добавьте конфигурацию, связанную с подписчиком сообщений MQTT, используйте
@ServiceActivator
Аннотация объявляет активатор службы черезMessageHandler
обрабатывать сообщения подписки;
/**
* MQTT消息订阅者相关配置
* Created by macro on 2020/9/15.
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理订阅消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
- Добавить конфигурацию, связанную с публикатором сообщений MQTT;
/**
* MQTT消息发布者相关配置
* Created by macro on 2020/9/15.
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
- Добавить шлюз MQTT для отправки сообщений в темы;
/**
* MQTT网关,通过接口将数据传递到集成流
* Created by macro on 2020/9/15.
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送消息到默认topic
*/
void sendToMqtt(String payload);
/**
* 发送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送消息到指定topic并设置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
- Добавить тестовый интерфейс MQTT, использовать шлюз MQTT для отправки сообщений в определенные темы;
/**
* MQTT测试接口
* Created by macro on 2020/9/15.
*/
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@ApiOperation("向默认主题发送消息")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@ApiOperation("向指定主题发送消息")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
- Вызов интерфейса для отправки сообщения в тему для тестирования;
- Сообщение успешно получено и напечатано в фоновом режиме.
2020-09-17 14:29:01.689 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息
2020-09-17 14:29:06.101 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息
2020-09-17 14:29:07.384 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息
Суммировать
Применение промежуточного программного обеспечения сообщений становится все более и более обширным.Он может не только реализовать надежную асинхронную связь, но и реализовать мгновенную связь.Необходимо освоить промежуточное программное обеспечение сообщений. Если нет особых бизнес-требований, клиент или внешний интерфейс могут напрямую использовать MQTT для подключения к промежуточному программному обеспечению сообщений для реализации мгновенной связи.При наличии особых потребностей это также можно реализовать путем интеграции MQTT с SpringBoot.Короче говоря, ПО промежуточного слоя сообщений — хороший способ реализовать мгновенную связь.
Адрес исходного кода проекта
Эта статьяGitHubGitHub.com/macro-positive/…Записано, приветствую всех на Star!