Применение WebSocket в современных браузерах относительно распространено, в некоторых бизнес-сценариях требуется возможность отправлять сообщения с сервера клиенту. Мы использовали dwr во времена, когда не было WebSocket, и в то время dwr был действительно отличным решением. Но после появления WebSocket мы предпочитаем использовать стандартные реализации для решения проблем,
Прежде всего, поясню, что в этой статье не объясняется настройка WebSocket, а основное внимание уделяется выбору решений в кластерном режиме микросервисной архитектуры.
Каждый должен быть знаком с микросервисной архитектурой, в микросервисной архитектуре сервисы распределены, и для обеспечения доступности бизнеса каждый сервис существует в виде кластера. В режиме кластера, чтобы гарантировать, что каждый узел кластера обращается к одному и тому же результату, необходимо добиться согласованности данных, таких как кеш, сеанс и т. д.
Кэш кластера микросервисов обычно решается распределенным кешем redis, а согласованность сеансов обычно решается redis, но сейчас более популярен Http без сохранения состояния, то есть без сеанса, самое распространенное решение — OAuth.
WebSocket отличается.Он устанавливает длинное соединение с сервером.В режиме кластера заведомо невозможно установить соединение между фронтендом и каждым узлом сервисного кластера.Осуществимая идея - решить совместное использование http сессий через Redis реализует совместное использование сеансов веб-сокетов, но количество сеансов веб-сокетов намного больше, чем количество сеансов http (поскольку соединение с веб-сокетами устанавливается каждый раз при открытии страницы), поэтому по мере увеличения числа пользователей объем общих данных уменьшается. слишком велики, легко создать узкие места.
Другая идея заключается в том, что веб-сокет всегда будет устанавливать соединение с узлом в кластере, а затем, пока узел, на котором находится соединение, может быть отправлен на сервер, проблема, которую необходимо решить, состоит в том, как найти узел, на котором находится веб-сокет. соединение находится. Чтобы узнать, на каком узле находится соединение, нам нужен уникальный идентификатор для нахождения соединения.Однако в модели публикации-подписки, основанной на stomp, сообщение может быть отправлено на несколько соединений, которые могут быть распределены по каждому узлу в узле. кластер, На узле это также дорого, чтобы найти соединение. В этом случае мы могли бы также изменить образ мышления.Для каждого сообщения веб-сокета мы проталкиваем его на каждую ноду кластера и подписываемся на подключение сообщения.Будь то одна или десять тысяч, мы обязательно получим сообщение в конце. Основываясь на этой идее, мы сделали несколько технических решений:
-
RabbitMQ
-
Spring Cloud Stream
Во-первых, RabbitMQ, продвинутая очередь сообщений, может реализовать широковещательную рассылку сообщений (конечно, kafka может делать то же самое, здесь представлена только одна), еще одна технология — Spring Cloud Stream, stream — высоко масштабируемый событийно-управляемый микросервис для построения Фреймворк, и его можно интегрировать с RabbitMQ, Kafka и другими различными службами сообщений, используя поток, это просто вопрос изменения конфигурации, чтобы заменить rabbitmq на kafka. Далее мы сосредоточимся на том, как его использовать:
импортировать зависимости
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Настроить подшивку
Binder — важная концепция потоковой передачи, которая используется для настройки ПО промежуточного слоя сообщений для потоковой публикации и подписки на события. Сначала посмотрите на конфигурацию:
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
username: username
password: password
virtual-host: /
Кролик по умолчанию в конфигурации — это имя связующего, на которое через какое-то время будут ссылаться в других конфигурациях, тип указывает тип промежуточного программного обеспечения сообщений, среда — это конфигурация промежуточного программного обеспечения сообщений, структура конфигурации здесь и элементы конфигурации в разделе пространство имен spring.rabbitmq Точно так же, вы можете обратиться к конфигурации (функция этой конфигурации состоит в том, чтобы отличить конфигурацию rabbitmq потока от конфигурации rabbitmq, используемой в другом месте проекта. Если среда не настроена здесь, биндер будет следовать конфигурация под пространством имен spring.rabbitmq), например, конфигурация rabbitmq в вашем проекте выглядит так:
spring:
rabbitmq:
host: localhost
username: username
password: password
virtual-host: /
Конфигурация среды связующего "от двери до двери" может быть полностью удалена.
Связывание потока сообщений и связующего
Чтобы микросервисы могли получать и публиковать сообщения о событиях, согласно названию spring cloud stream, как следует из названия, ему необходимо использовать потоки, поэтому в конфигурации необходимо объявить два потока событий, один поток ввода и один поток вывода:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
destination: websocketMessage
binder: defaultRabbit
websocketMessageOut:
destination: websocketMessage
binder: defaultRabbit
Здесь мы видим, что поток событий ссылается на биндер, а это значит, что эти два потока используют промежуточное ПО rabbitmq (вы должны были понимать, что в проекте Rabbit и kafka могут использоваться как промежуточное ПО сообщения потока событий одновременно ).
websocketMessageIn, websocketMessageOut — имя потока событий (вы можете запустить его сами), назначение указывает, что назначение двух потоков событий одно и то же, что определяет, что запись и чтение указывают на одно и то же место (не обязательно в одну и ту же очередь сообщений ).
объявление потока событий
Потоки событий определяются с помощью интерфейсов:
/**
* websocket消息事件流接口
* Created by 吴昊 on 18-11-8.
*
* @author 吴昊
* @since 1.4.3
*/
interface WebSocketMessageStream {
companion object {
const val INPUT: String = "webSocketMessageIn"
const val OUTPUT: String = "webSocketMessageOut"
}
/**
* 输入
*/
@Input(INPUT)
fun input(): SubscribableChannel
/**
* 输出
*/
@Output(OUTPUT)
fun output(): MessageChannel
}
Объявите интерфейс потока событий, который определяет две константы, соответствующие двум именам потоков в конфигурации: входной поток получается вызовом метода input(), а выходной поток получается вызовом метода output().
Реализация этого интерфейса выполняется потоком Spring Cloud, и его не нужно реализовывать самостоятельно.
Работа с потоками событий
Объявите бин:
@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……
Здесь аннотация @EnableBinding указывает класс интерфейса потока событий. Только при добавлении этой аннотации (для распознавания Spring ее можно добавить в класс ввода или в класс, аннотированный с помощью @Configuration), интерфейс будет реализован. , и добавлен в контейнер Spring (можно вводить).
Содержимое вышеуказанного WebSocketMessageService выглядит следующим образом:
@Autowired
private lateinit var stream: WebSocketMessageStream
@Autowired
private lateinit var template: SimpMessagingTemplate
@StreamListener(WebSocketMessageStream.INPUT)
fun messageReceived(message: WebSocketMessage) {
template.convertAndSend(message.destination, message.body)
}
fun send(destination: String, body: Any) {
stream.output().send(
MutableMessage(WebSocketMessage(destination, body))
)
}
получить сообщение
Аннотация @streamListener указывает мониторинг потока событий. Параметр, полученный методом, является содержимым сообщения о событии (десериализовано с помощью Jackson). Метод MessageReative здесь напрямую отправляет полученное сообщение на передний конец с помощью Websocket.
Отправить сообщение
Точно так же отправка также очень проста.Отправьте сообщение непосредственно во входной поток.Приведенный выше метод отправки заключается в отправке сообщения, которое должно было быть отправлено в веб-сокет с помощью шаблона SimpMessagingTemplate, в поток событий весеннего облачного потока. После этого все операции в проекте, которым необходимо отправить сообщения webSocket во внешний интерфейс, должны вызывать метод send.
На этом этапе вы все еще можете быть немного сбиты с толку и у вас могут возникнуть вопросы: почему каждый узел микросервиса может получать сообщения о событиях? Или как контролируется конфигурация, при которой один узел получает сообщения о событиях и несколько узлов. Не волнуйтесь, все, подождите, пока я буду говорить медленно, а потом я объясню это в сочетании со знанием кролика. в настоящее время:
Сначала посмотрите на очередь сообщений Rabbit:
Как видно из рисунка, существует несколько очередей, начинающихся с webSocketMessage, то есть каждый узел микросервиса создает очередь сообщений, а затем смотрите на обмен:
Очередь сообщений, привязанная к ExchangeИмя обмена здесь и префикс имени очереди сообщений выше оба являются webSocketMessage, которые обаЗадается местом назначения в предыдущей конфигурации привязки в соответствии с именем места назначения.
Когда приложение записывает события во входной поток, оно использует пункт назначения в качестве ключа (т. е. webSocketMessage) и записывает сообщение в обмен с именем webSocketMessage.Поскольку префиксы очередей сообщений, привязанных к обмену, — это все webSocketMessage, а ключ маршрутизации — #, поэтому обмен Сообщение будет перенаправлено в очередь сообщений в начале каждого сообщения webSocketMessage (это включает в себя точки знаний rabbitmq, если вы не понимаете, обратитесь к информации самостоятельно), так что каждый микросервис может получить одно и то же сообщение.
Давайте посмотрим на вопрос, поднятый ранее.Такая конфигурация может отправлять сообщения на каждый узел микросервиса.Итак, если сообщение должно быть получено только одним узлом, как его настроить? Очень просто, пункт конфигурации можно сделать:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
group: test
destination: websocketMessage
binder: defaultRabbit
Видно, что по сравнению с предыдущей конфигурацией есть только еще одна групповая конфигурация, после которой rabbitmq будет генерировать очередь сообщений с именем websocketMessage.test (очередь сообщений, созданная каждым микросервисом, упомянутым выше, автоматически удаляется. , т.е. , очередь сообщений удаляется после отключения микросервиса, и эта очередь сообщений является персистентной, то есть даже если все узлы микросервиса будут отключены, они не удалятся), все узлы микросервиса слушают эту очередь A, когда есть сообщения в очереди, будет потребляться только одним узлом.
Это конец содержания. Конфигурация весеннего облачного потока намного больше, но этих конфигураций достаточно, чтобы выполнить то, что мне нужно сделать. Для других конфигураций, пожалуйста, обратитесь к официальным документам весеннего облачного потока: