задний план
Во время переднего плана компания разработала программу для активного сервисного робота, что означает, что сгенерированные сообщения активно отправляются клиенту (H5, IOS, Android) через сервер, который поддерживает персонализированные настройки переключателя пользователя, и пользователь может свободно выбирать тип полученных сообщений, в то же время, пользователи поддерживаются, чтобы задавать вопросы, здесь записывается общая идея всего развертывания и реализации;
Также спасибо моему Лидеру за помощь.
развертывать
Конфигурация Nginx
- Чтобы длительное соединение оставалось действительным, настройте HTTP версии 1.1;
- настроить
Upgrade
иConnection
информация заголовка ответа;
Полная конфигурация выглядит следующим образом:
location / {
proxy_pass http://nodes;
# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
Конфигурация сокета
Класс конфигурации сокета
public class WebSocketConfig {
private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
@Value("${wss.server.host}")
private String host;
@Value("${wss.server.port}")
private Integer port;
@Value("${redis.passwd}")
private String redisPasswd;
@Value("${redis.address}")
private String redisAddress;
@Bean
public PubSubStore pubSubStore() {
return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
}
@Bean
public SocketIOServer socketIOServer() {
Config redissonConfig = new Config();
// 高版本需求 redis:// 前缀
redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();
RedissonClient redisson = Redisson.create(redissonConfig);
RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);
Configuration config = new Configuration();
config.setHostname(host);
config.setPort(port);
config.setOrigin(origin);
config.setHttpCompression(false);
config.setWebsocketCompression(false);
config.setStoreFactory(redisStoreFactory);
// 注意如果开放跨域设置,需要设置为null而不是"*"
config.setOrigin(null);
// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
config.setUpgradeTimeout(10000);
// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
config.setPingInterval(25000);
// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
config.setPingTimeout(60000);
/** 异常监听事件,必须覆写全部方法 */
config.setExceptionListener(new ExceptionListener(){
@Override
public void onConnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "连接异常!");
client.sendEvent("exception", JSON.toJSON(new Response<String>(error, "连接异常!")));
}
@Override
public void onDisconnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "断开异常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "连接异常!")));
}
@Override
public void onEventException(Exception e, List<Object> data, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "服务器异常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "连接异常!")));
}
@Override
public void onPingException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "PING 超时异常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "PING 超时异常!")));
}
@Override
public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
return false;
}
});
// 类似于过滤器设置,此处不作处理
config.setAuthorizationListener(data -> {
// // 可以使用如下代码获取用户密码信息
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {}, client {}", appId, source);
return true;
});
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
Класс запуска сокета
@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) throws Exception {
server.start();
log.info("socket.io启动成功!");
}
}
окончательная архитектура
Процесс реализации
Активный мониторинг службы push-уведомлений Как потребитель Kafka, производитель данных отправляет обработанные данные в Kafka, а потребитель прослушивает сообщение, транслируемое клиенту; при отправке соответствующие персонализированные настройки пользователя запрашиваются в базе данных, и только клиент push-уведомлений принимает новости;
Поскольку активная служба push развертывает несколько узлов, а несколько узлов назначаются одной и той же группе потребителей Kafka, это приведет к тому, что несколько узлов будут потреблять только часть всех сообщений; здесь используйтеRedis
из发布/订阅
Механизм решения этой проблемы: когда потребитель после сообщения, сообщение публикуется каждым узлом, другие узлы подписываютсяTopic
Отправлять сообщение клиентскому узлу при каждом подключении, т. е. когда каждый узел является издателем и подписчиком;
От генерации данных к потреблению
Использование темы Redisson для реализации распределенной публикации/подписки
Redisson для удобства в Redis发布/订阅
Использование механизма инкапсулирует его в тему и обеспечивает уровень кода.发布/订阅
операция, так что после подключения нескольких процессов JVM к Redis (одна машина/кластер) их можно реализовать в одном процессе JVM发布
изTopic
В другом уже订阅
Сообщение может быть своевременно получено в процессе JVM темы.
Интегрирован в Netty-SocketIORedisson
После этого внутрь также употребляли发布/订阅
механизм
выпуск новостей
public void sendMessageToAllClient(String eventType, String message, String desc) {
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// Do Somthing
}
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(new BroadcastMessage(message, eventType, desc));
publishMessage(packet);
}
private void publishMessage(Packet packet) {
DispatchMessage dispatchMessage = new DispatchMessage("", packet, "");
pubSubStore.publish(PubSubType.DISPATCH, dispatchMessage);
BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();
}
подписка на сообщения
@PostConstruct
public void init() {
pubSubStore.subscribe(PubSubType.DISPATCH, dispatchMessage -> {
BroadcastMessage messageData = dispatchMessage.getPacket().getData();
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// DO Somthing
}, DispatchMessage.class);
}