Spring Boot series 22 Spring Websocket реализует демо-версию кластерного решения websocket.

Spring Boot Redis Spring WebSocket

Обзор

предыдущий постSpring Boot series 21 Spring Websocket реализует обсуждение кластерного решения websocketПодробно представлены три схемы кластеризации WebSocket, и сделан вывод, что третья схема является наилучшей В этой статье мы реализуем третью схему.

Третья схема выглядит следующим образом

这里写图片描述

На основе схемы 1 сделаны следующие модификации, и схема новой архитектуры выглядит следующим образом:

  1. Служба A добавляет модуль WS. Когда веб-сокет подключается, информация о подключении пользователя (в основном значение веб-сокета sessionId) сохраняется в Redis
  2. Обмен, на который производители сообщений отправляют сообщения, эти сервисы не проталкивают сервис A/B напрямую.
  3. Добавьте новую отправку модуля, этот модуль получает отправленную информацию и читает сообщение от Redis, чтобы получить значение sessionId websocket, соответствующее пользователю, а затем вычисляет ключ маршрутизации, соответствующий пользователю, в соответствии с вышеуказанными правилами, а затем отправляет сообщение пользователю, на которого подписано в очереди
  4. передний конец получить сообщение

Подробный код реализации

Название проекта: мвк Эта статья находится вSpring Boot Series 20 Реализация Spring Websocket отправляет сообщения указанным пользователямоснование для модификации.

Ввести банки, связанные с redis, rabbitmq, в pom.xml

<!--  webscoekt 集群 需要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

rabbitmq, конфигурация Redis

application-wscluster.properties

# websocket集群需要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev

# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1

IRedisSessionService и его реализация

Интерфейс IRedisSessionService определяет операции над redis. Класс реализации IRedisSessionService хранит связь между именем пользователя и идентификатором сеанса веб-сокета в Redis и обеспечивает добавление, удаление, запросIRedisSessionService

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}

SimulationRedisSessionServiceImplСохраните связь между именем пользователя и идентификатором сеанса веб-сокета в Redis, добавьте, удалите, запросите

@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {

    @Autowired
    private RedisTemplate<String, String> template;

    // key = 登录用户名称, value=websocket的sessionId
    private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);

    /**
     * 在缓存中保存用户和websocket sessionid的信息
     * @param name
     * @param wsSessionId
     */
    public void add(String name, String wsSessionId){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
    }

    /**
     * 从缓存中删除用户的信息
     * @param name
     */
    public boolean del(String name){
        return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                return connection.del(rawKey) > 0;
            }
        }, true);
    }

    /**
     * 根据用户id获取用户对应的sessionId值
     * @param name
     * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        return boundValueOperations.get();
    }
}

AuthWebSocketHandlerDecoratorFactory

Декорируем объект WebSocketHandlerDecorator, при установлении соединения сохраняем идентификатор сессии вебсокета, где ключом является имя учетной записи, при разрыве связи удаляем значение sessionId пользователя из кеша. Это значение sessionId веб-сокета используется для создания ключа маршрутизации для сообщения.

@Component
public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory {
    private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class);

    @Autowired
    private IRedisSessionService redisSessionService;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                // 客户端与服务器端建立连接后,此处记录谁上线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = principal.getName();
                    log.info("websocket online: " + username + " session " + session.getId());
                    redisSessionService.add(username, session.getId());
                }
                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                // 客户端与服务器端断开连接后,此处记录谁下线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = session.getPrincipal().getName();
                    log.info("websocket offline: " + username);
                    redisSessionService.del(username);
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}

WebSocketRabbitMQMessageBrokerConfigurer

существуетSpring Boot Series 20 Реализация Spring Websocket отправляет сообщения указанным пользователямНа основе добавления следующих функций настройте myWebSocketHandlerDecoratorFactory на websocket.

@Configuration
// 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private MyPrincipalHandshakeHandler myDefaultHandshakeHandler;
    @Autowired
    private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor;

    @Autowired
    private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        ….
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
          …       
    }

    /**
     * 这时实际spring weboscket集群的新增的配置,用于获取建立websocket时获取对应的sessionid值
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory);
        super.configureWebSocketTransport(registration);
    }
}

ТестMQCtl:

вышеSpring Boot Series 20 Реализация Spring Websocket отправляет сообщения указанным пользователямНа основе этого измените этот класс

  • Метод sendMq2User() объединяет ключ маршрутизации в соответствии с учетной записью пользователя и идентификатором сеанса веб-сокета в соответствии с ["имя очереди веб-подписки+'-пользователь'+идентификатор сеанса веб-сокета"]. Затем отправьте сообщение на биржу amq.topic через экземпляр AmqpTemplate, а ключ маршрутизации — ["имя очереди веб-подписки+'-user'+websocket sessionId"]. В методе websocket sessionId получается из redis по имени учетной записи Другие методы, не перечисленные здесь один за другим
@Controller
@RequestMapping(value = "/ws")
public class TestMQCtl {
    private  static final Logger log = LoggerFactory.getLogger(TestMQCtl.class);

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private IRedisSessionService redisSessionService;

     /**
     * 向执行用户发送请求
     * @param msg
     * @param name
     * @return
     */
    @RequestMapping(value = "send2user")
    @ResponseBody
    public int sendMq2User(String msg, String name){
        // 根据用户名称获取用户对应的session id值
        String wsSessionId = redisSessionService.get(name);
        RequestMessage demoMQ = new RequestMessage();
        demoMQ.setName(msg);

        // 生成路由键值,生成规则如下: websocket订阅的目的地 + "-user" + websocket的sessionId值。生成值类似:
        String routingKey = getTopicRoutingKey("demo", wsSessionId);
        // 向amq.topi交换机发送消息,路由键为routingKey
        log.info("向用户[{}]sessionId=[{}],发送消息[{}],路由键[{}]", name, wsSessionId, wsSessionId, routingKey);
        amqpTemplate.convertAndSend("amq.topic", routingKey,  JSON.toJSONString(demoMQ));
        return 0;
    }

    /**
     * 获取Topic的生成的路由键
     *
     * @param actualDestination
     * @param sessionId
     * @return
     */
    private String getTopicRoutingKey(String actualDestination, String sessionId){
        return actualDestination + "-user" + sessionId;
    }
   ….
}

контрольная работа

Запустите две службы с разными портамиЗапустите класс службы: WebSocketClusterApplication. Запустите службу A с параметром "--spring.profiles.active=wscluster --server.port=8081". Запустите службу B с параметром "--spring.profiles.active=wscluster --server.port=8082".

Войдите в смоделированную учетную запись: служба входа xiaoming A, служба входа xiaoming2 BИспользуйте xiaoming для входа в службу A и входа в веб-сокет http://127.0.0.1:8081/ws/login Используйте xiaoming для входа и отправки

这里写图片描述
Нажмите «Подключиться», если соединение становится серым, веб-сокет для входа в систему выполнен успешно.
这里写图片描述

Откройте другой браузер, используйте xiaoming2 для входа в службу B и войдите в веб-сокет. http://127.0.0.1:8082/ws/login Используйте xiaoming2 для входа в систему и отправки, и, наконец, войдите в веб-сокет

Войдите в службу A, чтобы имитировать отправку страницыВойдите на http://127.0.0.1:8081/ws/send и отправьте сообщение

  1. Отправка сообщения xiaoming-receive на учетную запись xiaoming может быть получена только сервисным веб-сокетом, подключенным к сервису A §
  2. Отправка сообщения xiaoming2-receive на учетную запись xiaoming2 может быть получена только сервисным веб-сокетом, подключенным к сервису B.

В этот момент две страницы получают информацию:

这里写图片描述

Аккаунт xiaoming получает только xiaoming-receive Аккаунт xiaoming2 получает только xiaoming2-receive

Войдите в сервис B, чтобы имитировать отправку страницыВойдите в http://127.0.0.1:8082/ws/send, отправьте сообщение и отправьте то же сообщение, что и http://127.0.0.1:8081/ws/send, результат тот же

в заключенииНезависимо от того, входит ли пользователь в службу A или службу B, мы можем отправлять сообщения указанному пользователю через приведенный выше код, поэтому мы реализовали кластеризацию веб-сокетов.

код

Смотрите код github для всех подробных кодов, пожалуйста, используйте как можно большетег v0.24, не используйте мастер, потому что мастер всегда меняется, нет гарантии, что код в статье и код на гитхабе всегда будут одинаковыми