SpringBoot+RabbitMQ+WebSocket реализует простую функцию чата

RabbitMQ

1. Введение в веб-сокет

WebSocket — это протокол, предоставляемый HTML5 для полнодуплексной связи по одному TCP-соединению.

WebSocket упрощает обмен данными между клиентом и сервером, позволяя серверу активно передавать данные клиенту. В API WebSocket браузеру и серверу нужно выполнить только одно рукопожатие, и между ними может быть создано постоянное соединение, и может выполняться двусторонняя передача данных.

В WebSocket API браузеру и серверу достаточно сделать рукопожатие, после чего между браузером и сервером формируется быстрый канал. Данные могут передаваться напрямую между ними.

Сейчас для реализации push-технологии многие сайты используют Ajax-опрос. Опрос — это определенный интервал времени (например, каждую 1 секунду), когда браузер отправляет HTTP-запрос на сервер, а затем сервер возвращает последние данные в браузер клиента. У этого традиционного режима есть очевидные недостатки, то есть браузеру необходимо постоянно отправлять запросы на сервер, однако HTTP-запрос может содержать длинный заголовок, а действительно достоверных данных может быть лишь малая часть, что явно расточительно. много пропускной способности и других ресурсов.

Протокол WebSocket, определенный HTML5, может лучше экономить ресурсы сервера и пропускную способность, а также может обмениваться данными в более реальном времени.

2. Помещение

Реализация push-сообщений на одном компьютере очень проста, и ее можно передавать через веб-сокет, но совместное использование сеансов не поддерживается в распределенной среде, поскольку серверы разные, поэтому RabbitMQ+webSocket можно использовать для реализации распределенного push-сообщения.

Проект основан на предыдущей трансформации проекта.В первую очередь должны быть интегрированы RabbitMQ и WebSokcet.Основные идеи реализации:

Клиенты регистрируются на своих соответствующих серверах через веб-сокеты, а серверы привязаны к одной и той же очереди MQ, поэтому каждый раз, когда клиент отправляет сообщение, оно сначала помещается в MQ, а затем потребители, привязанные к очереди, отправляют информацию о сокете. а затем реализуйте межсерверную отправку сообщений.

3. Реализация кода

Вставьте основной код сюда

Включите поддержку WebSocket:

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocketServer:

@ServerEndpoint("/websocket/{from}")
@Component
public class WebSocketServer {
    private static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
    private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //from
    private static String from = "";

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("from") String from) {
        this.session = session;
        webSocketMap.put(from, this);     //加入set中
        addOnlineCount();           //在线数加1
        logger.info("有新窗口开始监听:"+from+",当前在线人数为" + getOnlineCount());
        this.from=from;
        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            logger.error("websocket IO异常");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketMap.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        logger.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("收到来自窗口"+from+"的信息:"+message);
        JSONObject obj = new JSONObject();
        obj.put("cmd", "heartcheck");//业务类型
        obj.put("msgTxt", "服务端心跳响应 ");//消息内容
        obj.put("msgDate", DateUtils.getCurrentDateTime());//时间
        session.getAsyncRemote().sendText(obj.toJSONString());
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("发生错误");
        error.printStackTrace();
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群发自定义消息
     * */
    public static void sendMqMessage(String message) throws IOException {
        JSONObject jsonObject = new JSONObject();
        logger.info("推送消息到窗口"+from+",推送内容:"+message);
        ChatMsg chatMsg = JSONObject.parseObject(message, ChatMsg.class);

        if(chatMsg != null && chatMsg.getTo() != null && webSocketMap.containsKey(chatMsg.getTo())){
            webSocketMap.get(chatMsg.getTo()).sendMessage(message);
        }else{
            logger.error("用户"+chatMsg.getTo()+",不在线!");
        }

    }

    /**
     * 群发自定义消息
     * */
    public static void sendInfo(String message,@PathParam("from") String from) throws IOException {
        JSONObject jsonObject = new JSONObject();
        logger.info("推送消息到窗口"+from+",推送内容:"+message);
//        for (WebSocketServer item : webSocketMap) {
//            try {
//                String date = format.format(new Date());
//                String mes = message+ " (" + date + ")";
//                jsonObject.put("mes",mes);
//                jsonObject.put("sender",from);
//                //这里可以设定只推送给这个sid的,为null则全部推送
//                if(from==null) {
//                    item.sendMessage(jsonObject.toString());
//                }else if(item.from.equals(from)){
//                    item.sendMessage(jsonObject.toString());
//                }
//            } catch (IOException e) {
//                continue;
//            }
//        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

Создайте очередь сообщений:

@Configuration
public class FanoutRabbitConfig {

    public static final String DEFAULT_BOOK_QUEUE = "dev.book.fanout.a.queue";

    @Bean
    public Queue queueMessageA() {
        // 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
        return new Queue(DEFAULT_BOOK_QUEUE, true);
    }



    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueMessageA).to(fanoutExchange);
    }

}

потребитель:

@Component
public class BookHandler {

    private static final Logger logger = LoggerFactory.getLogger(BookHandler.class);


    @RabbitListener(queues = {FanoutRabbitConfig.DEFAULT_BOOK_QUEUE})
    public void listenerAutoAck(String text, Message message, Channel channel) {

        // TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info("[消费者一监听的消息] - [{}]", text);
            new WebSocketServer().sendMqMessage(text);

            // TODO 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                // TODO 处理失败,重新压入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

}

Класс сообщения:

public class ChatMsg {
    private String from;//发送的username
    private String to;//接收者
    private String content;//内容
    private Date date;//时间
    private String fromNickname;//昵称

    public String getFromNickname() {
        return fromNickname;
    }

    public void setFromNickname(String fromNickname) {
        this.fromNickname = fromNickname;
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }
}

Отправить сообщение контроллеру:

@Controller
@RequestMapping("/testmq")
public class MqTestController {
    private static Logger logger = LoggerFactory.getLogger(MqTestController.class);

    @Autowired
    private  RabbitTemplate rabbitTemplate; //rabbitTemplate是springboot 提供的默认实现


    @RequestMapping(value="/send")
    @ResponseBody
    public void defaultMessage(String message, String from, String to) {
        ChatMsg chatMsg = new ChatMsg();
        chatMsg.setFrom(from);
        chatMsg.setTo(to);
        chatMsg.setContent(message);
        chatMsg.setDate(new Date());
       rabbitTemplate.convertAndSend("fanoutExchange", "", JSONObject.toJSONString(chatMsg));
    }
}

пом зависимости:

        <!-- rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

Конфигурационный файл:

  #rabbitmq 配置
  spring:
      rabbitmq:
        host: 192.0.0.171
        port: 5672
        username: admin
        password: 123456
        #虚拟主机
        virtual-host: /
        listener:
          simple:
            #手动ACK
            acknowledge-mode: manual

4. Тест

Сначала откройте два пакета jar, запустите их, используя два разных порта, и откройте два окна браузера для тестирования.

После простого теста проблем нет.


Добро пожаловать, чтобы обратить внимание на личный публичный номер