1. Введение в веб-сокет
WebSocket — это протокол, предоставляемый HTML5 для полнодуплексной связи по одному TCP-соединению.
WebSocket упрощает обмен данными между клиентом и сервером, позволяя серверу активно передавать данные клиенту. В API WebSocket браузеру и серверу нужно выполнить только одно рукопожатие, и между ними может быть создано постоянное соединение, и может выполняться двусторонняя передача данных.
В WebSocket API браузеру и серверу достаточно сделать рукопожатие, после чего между браузером и сервером формируется быстрый канал. Данные могут передаваться напрямую между ними.
Сейчас для реализации push-технологии многие сайты используют Ajax-опрос. Опрос — это определенный интервал времени (например, каждую 1 секунду), когда браузер отправляет HTTP-запрос на сервер, а затем сервер возвращает последние данные в браузер клиента. У этого традиционного режима есть очевидные недостатки, то есть браузеру необходимо постоянно отправлять запросы на сервер, однако HTTP-запрос может содержать длинный заголовок, а действительно достоверных данных может быть лишь малая часть, что явно расточительно. много пропускной способности и других ресурсов.
Протокол WebSocket, определенный HTML5, может лучше экономить ресурсы сервера и пропускную способность, а также может обмениваться данными в более реальном времени.
2. Помещение
Реализация push-сообщений на одном компьютере очень проста, и ее можно передавать через веб-сокет, но совместное использование сеансов не поддерживается в распределенной среде, поскольку серверы разные, поэтому RabbitMQ+webSocket можно использовать для реализации распределенного push-сообщения.
Проект основан на предыдущей трансформации проекта.В первую очередь должны быть интегрированы RabbitMQ и WebSokcet.Основные идеи реализации:
Клиенты регистрируются на своих соответствующих серверах через веб-сокеты, а серверы привязаны к одной и той же очереди MQ, поэтому каждый раз, когда клиент отправляет сообщение, оно сначала помещается в MQ, а затем потребители, привязанные к очереди, отправляют информацию о сокете. а затем реализуйте межсерверную отправку сообщений.
3. Реализация кода
Вставьте основной код сюда
Включите поддержку WebSocket:
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServer:
@ServerEndpoint("/websocket/{from}")
@Component
public class WebSocketServer {
private static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//from
private static String from = "";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("from") String from) {
this.session = session;
webSocketMap.put(from, this); //加入set中
addOnlineCount(); //在线数加1
logger.info("有新窗口开始监听:"+from+",当前在线人数为" + getOnlineCount());
this.from=from;
try {
sendMessage("连接成功");
} catch (IOException e) {
logger.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketMap.remove(this); //从set中删除
subOnlineCount(); //在线数减1
logger.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("收到来自窗口"+from+"的信息:"+message);
JSONObject obj = new JSONObject();
obj.put("cmd", "heartcheck");//业务类型
obj.put("msgTxt", "服务端心跳响应 ");//消息内容
obj.put("msgDate", DateUtils.getCurrentDateTime());//时间
session.getAsyncRemote().sendText(obj.toJSONString());
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
logger.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
* */
public static void sendMqMessage(String message) throws IOException {
JSONObject jsonObject = new JSONObject();
logger.info("推送消息到窗口"+from+",推送内容:"+message);
ChatMsg chatMsg = JSONObject.parseObject(message, ChatMsg.class);
if(chatMsg != null && chatMsg.getTo() != null && webSocketMap.containsKey(chatMsg.getTo())){
webSocketMap.get(chatMsg.getTo()).sendMessage(message);
}else{
logger.error("用户"+chatMsg.getTo()+",不在线!");
}
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message,@PathParam("from") String from) throws IOException {
JSONObject jsonObject = new JSONObject();
logger.info("推送消息到窗口"+from+",推送内容:"+message);
// for (WebSocketServer item : webSocketMap) {
// try {
// String date = format.format(new Date());
// String mes = message+ " (" + date + ")";
// jsonObject.put("mes",mes);
// jsonObject.put("sender",from);
// //这里可以设定只推送给这个sid的,为null则全部推送
// if(from==null) {
// item.sendMessage(jsonObject.toString());
// }else if(item.from.equals(from)){
// item.sendMessage(jsonObject.toString());
// }
// } catch (IOException e) {
// continue;
// }
// }
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
Создайте очередь сообщений:
@Configuration
public class FanoutRabbitConfig {
public static final String DEFAULT_BOOK_QUEUE = "dev.book.fanout.a.queue";
@Bean
public Queue queueMessageA() {
// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
return new Queue(DEFAULT_BOOK_QUEUE, true);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessageA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueMessageA).to(fanoutExchange);
}
}
потребитель:
@Component
public class BookHandler {
private static final Logger logger = LoggerFactory.getLogger(BookHandler.class);
@RabbitListener(queues = {FanoutRabbitConfig.DEFAULT_BOOK_QUEUE})
public void listenerAutoAck(String text, Message message, Channel channel) {
// TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
logger.info("[消费者一监听的消息] - [{}]", text);
new WebSocketServer().sendMqMessage(text);
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// TODO 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
Класс сообщения:
public class ChatMsg {
private String from;//发送的username
private String to;//接收者
private String content;//内容
private Date date;//时间
private String fromNickname;//昵称
public String getFromNickname() {
return fromNickname;
}
public void setFromNickname(String fromNickname) {
this.fromNickname = fromNickname;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
}
Отправить сообщение контроллеру:
@Controller
@RequestMapping("/testmq")
public class MqTestController {
private static Logger logger = LoggerFactory.getLogger(MqTestController.class);
@Autowired
private RabbitTemplate rabbitTemplate; //rabbitTemplate是springboot 提供的默认实现
@RequestMapping(value="/send")
@ResponseBody
public void defaultMessage(String message, String from, String to) {
ChatMsg chatMsg = new ChatMsg();
chatMsg.setFrom(from);
chatMsg.setTo(to);
chatMsg.setContent(message);
chatMsg.setDate(new Date());
rabbitTemplate.convertAndSend("fanoutExchange", "", JSONObject.toJSONString(chatMsg));
}
}
пом зависимости:
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Конфигурационный файл:
#rabbitmq 配置
spring:
rabbitmq:
host: 192.0.0.171
port: 5672
username: admin
password: 123456
#虚拟主机
virtual-host: /
listener:
simple:
#手动ACK
acknowledge-mode: manual
4. Тест
Сначала откройте два пакета jar, запустите их, используя два разных порта, и откройте два окна браузера для тестирования.
После простого теста проблем нет.
Добро пожаловать, чтобы обратить внимание на личный публичный номер