предисловие
Netty — это высокопроизводительная сетевая платформа NIO.В этой статье используется общий механизм пульса для понимания Netty на основе SpringBoot.
Окончательный эффект достигается:
- Клиент каждые N секунд проверяет, нужно ли ему отправлять пульс.
- Сервер также проверяет, нужно ли отправлять пульс каждые N секунд.
- Сервер может активно отправлять сообщения клиенту.
- На основе мониторинга SpringBoot вы можете просматривать подключения в реальном времени и различную информацию о приложениях.
Эффект следующий:
IdleStateHandler
Netty может использовать IdleStateHandler для управления соединением.Когда соединение бездействует слишком долго (сообщения не отправляются и не принимаются), будет инициировано событие, и мы можем реализовать механизм сердцебиения в этом событии.
сердцебиение клиента
Когда клиент простаивает в течение N секунд и не отправляет сообщение на сервер, он автоматически отправляет пульс для поддержания соединения.
Основной код кода выглядит следующим образом:
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//向服务端发送消息
CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);
ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
//从服务端收到消息时被调用
LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
}
}
Реализация очень проста, достаточно отправить сообщение в обратном вызове события.
Благодаря интеграции SpringBoot отправляемая информация о пульсе представляет собой одноэлементный компонент.
@Configuration
public class HeartBeatConfig {
@Value("${channel.id}")
private long id ;
@Bean(value = "heartBeat")
public CustomProtocol heartBeat(){
return new CustomProtocol(id,"ping") ;
}
}
Это включает в себя содержание пользовательского протокола, пожалуйста, продолжайте читать ниже.
Конечно, бутстрап незаменим:
@Component
public class HeartbeatClient {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.server.port}")
private int nettyPort;
@Value("${netty.server.host}")
private String host;
private SocketChannel channel;
@PostConstruct
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CustomerHandleInitializer())
;
ChannelFuture future = bootstrap.connect(host, nettyPort).sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
channel = (SocketChannel) future.channel();
}
}
public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new HeartbeatEncode())
.addLast(new EchoClientHandle())
;
}
}
Таким образом, когда приложение запускается, оно проверяет, было ли отправлено сообщение каждые 10 секунд, в противном случае оно будет отправлять информацию о сердцебиении.
сердцебиение сервера
Сердцебиение на стороне сервера на самом деле аналогично, и IdleStateHandler необходимо добавить в ChannelPipeline.
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomProtocol> {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class);
private static final ByteBuf HEART_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8));
/**
* 取消绑定
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettySocketHolder.remove((NioSocketChannel) ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.READER_IDLE){
LOGGER.info("已经5秒没有收到信息!");
//向客户端发送消息
ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
LOGGER.info("收到customProtocol={}", customProtocol);
//保存客户端与 Channel 之间的关系
NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
}
}
Здесь есть на что обратить внимание:
Когда подключено несколько клиентов, сервер должен их различать, иначе ответное сообщение будет перепутано.
Поэтому всякий раз, когда возникает соединение, мы связываем текущий канал с идентификатором подключенного клиента (Поэтому идентификатор клиента должен быть уникальным для каждого соединения.).
Здесь карта используется для сохранения отношений и автоматической отмены ассоциации при отключении соединения.
public class NettySocketHolder {
private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16);
public static void put(Long id, NioSocketChannel socketChannel) {
MAP.put(id, socketChannel);
}
public static NioSocketChannel get(Long id) {
return MAP.get(id);
}
public static Map<Long, NioSocketChannel> getMAP() {
return MAP;
}
public static void remove(NioSocketChannel nioSocketChannel) {
MAP.entrySet().stream().filter(entry -> entry.getValue() == nioSocketChannel).forEach(entry -> MAP.remove(entry.getKey()));
}
}
Запустите загрузчик:
Component
public class HeartBeatServer {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatServer.class);
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.server.port}")
private int nettyPort;
/**
* 启动 Netty
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss, work)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(nettyPort))
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new HeartbeatInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
boss.shutdownGracefully().syncUninterruptibly();
work.shutdownGracefully().syncUninterruptibly();
LOGGER.info("关闭 Netty 成功");
}
}
public class HeartbeatInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(5, 0, 0))
.addLast(new HeartbeatDecoder())
.addLast(new HeartBeatSimpleHandle());
}
}
Также в ChannelPipeline добавлен IdleStateHandler, также будет запланированная задача проверять, приходит ли сообщение каждые 5 секунд, иначе запрос будет отправляться активно.
Поскольку тест связан с двумя клиентами, журналов два.
пользовательский протокол
На самом деле, мы видели все это выше: сервер и клиент используют для связи пользовательский POJO.
Следовательно, необходимо кодировать на стороне клиента и декодировать на стороне сервера, и нужно только реализовать кодек для каждого.
Пользовательский протокол:
public class CustomProtocol implements Serializable{
private static final long serialVersionUID = 4671171056588401542L;
private long id ;
private String content ;
//省略 getter/setter
}
Кодировщик клиента:
public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
out.writeLong(msg.getId()) ;
out.writeBytes(msg.getContent().getBytes()) ;
}
}
Другими словами, первые восемь байтов сообщения являются заголовками, а остальные — содержимым.
Серверный декодер:
public class HeartbeatDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
long id = in.readLong() ;
byte[] bytes = new byte[in.readableBytes()] ;
in.readBytes(bytes) ;
String content = new String(bytes) ;
CustomProtocol customProtocol = new CustomProtocol() ;
customProtocol.setId(id);
customProtocol.setContent(content) ;
out.add(customProtocol) ;
}
}
Просто следуйте правилам только сейчас, чтобы расшифровать.
Принцип реализации
На самом деле, думая о функции IdleStateHandler, естественно думать о принципе ее реализации:
Для обработки этих сообщений должен существовать поток запланированных задач.
Давайте посмотрим на его исходный код:
Сначала конструктор:
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
На самом деле он инициализирует несколько данных:
- readerIdleTimeSeconds: какое-то время данные не читались
- WriterIdleTimeSeconds: какое-то время данные не отправлялись
- allIdleTimeSeconds: может быть выполнено одно из двух указанных выше условий.
Поскольку IdleStateHandler также является ChannelHandler, онchannelActive
Инициализируйте задачу в:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
То есть запланированная задача будет инициализирована согласно времени, которое мы даем.
Затем оцените, когда задача действительно выполняется:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
Событие IdleStateEvent генерируется, если выполняется условие.
Мониторинг SpringBoot
После интеграции SpringBoot мы можем использовать Spring не только для управления объектами, но и для мониторинга приложений.
контроль привода
Когда мы представили:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Он включает функцию мониторинга приводов SpringBoot, которая может предоставить нам множество конечных точек мониторинга.
Например, некоторая статистика в некоторых приложениях:
Существующие компоненты:
Для получения дополнительной информации см.:docs.spring.IO/весенняя загрузка…
Но если я хочу отслеживать, сколько клиентов сейчас подключено к моему серверу, каковы соответствующие идентификаторы?
По сути, это просмотр Карты ассоциации, определенной внутри в режиме реального времени.
Это требует раскрытия пользовательских конечных точек.
пользовательская конечная точка
Способ разоблачения тоже очень прост:
Унаследуйте AbstractEndpoint и переопределите его функцию вызова:
public class CustomEndpoint extends AbstractEndpoint<Map<Long,NioSocketChannel>> {
/**
* 监控端点的 访问地址
* @param id
*/
public CustomEndpoint(String id) {
//false 表示不是敏感端点
super(id, false);
}
@Override
public Map<Long, NioSocketChannel> invoke() {
return NettySocketHolder.getMAP();
}
}
Фактически, он возвращает данные на карте.
Затем настройте Bean этого типа:
@Configuration
public class EndPointConfig {
@Value("${monitor.channel.map.key}")
private String channelMap;
@Bean
public CustomEndpoint buildEndPoint(){
CustomEndpoint customEndpoint = new CustomEndpoint(channelMap) ;
return customEndpoint ;
}
}
Таким образом, мы можем передать файл конфигурацииmonitor.channel.map.key
пришел в гости:
Когда клиент подключается:
При подключении двух клиентов:
Интеграция SBA
Таким образом, функция мониторинга может быть удовлетворена, но может ли она отображаться более красиво, и можно ли легко просматривать несколько приложений?
Есть такие инструменты с открытым исходным кодом, которые делают это за нас:
Короче говоря, мы можем использовать этот инструмент для визуализации и агрегирования интерфейсов, предоставляемых приводом на странице:
Доступ тоже очень простой, для начала нужно ввести зависимости:
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
</dependency>
и добавить в файл конфигурации:
# 关闭健康检查权限
management.security.enabled=false
# SpringAdmin 地址
spring.boot.admin.url=http://127.0.0.1:8888
Перед запуском приложения поговорим о развертывании SpringBootAdmin:
Это приложение представляет собой чистый SpringBoot, его нужно только добавить в основную функцию@EnableAdminServer
аннотация.
@SpringBootApplication
@Configuration
@EnableAutoConfiguration
@EnableAdminServer
public class AdminApplication {
public static void main(String[] args) {
SpringApplication.run(AdminApplication.class, args);
}
}
Вводить:
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-server</artifactId>
<version>1.5.7</version>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-server-ui</artifactId>
<version>1.5.6</version>
</dependency>
Просто начните это сразу после.
Таким образом, мы можем просматривать много информации о приложении на странице SpringBootAdmin.
Для получения дополнительной информации, пожалуйста, обратитесь к официальному руководству:
codecentric.GitHub.IO/spring-boot…
Пользовательские данные мониторинга
На самом деле, мы можем использовать актуатор и эту страницу визуализации, чтобы отслеживать некоторые простые показатели.
Например, я написал два Rest-интерфейса в клиенте и сервере для отправки сообщений друг другу.
Просто хочу зарегистрировать, сколько раз каждый был отправлен:
Клиент:
@Controller
@RequestMapping("/")
public class IndexController {
/**
* 统计 service
*/
@Autowired
private CounterService counterService;
@Autowired
private HeartbeatClient heartbeatClient ;
/**
* 向服务端发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("客户端发送消息")
@RequestMapping("sendMsg")
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
}
Пока мы представляем пакет привода, мы можем напрямую внедрить counterService и использовать его для записи данных.
Когда мы вызываем этот интерфейс:
На странице мониторинга вы можете прямо сейчас запросить статус звонка:
Активное push-сообщение с сервера аналогично, но при отправке ему необходимо запросить конкретный канал для отправки в соответствии с идентификатором клиента:
Суммировать
Вышеприведенный пример простого пульса Netty демонстрирует мониторинг SpringBoot. После этого мы продолжим обновлять контент, связанный с Netty. Добро пожаловать, обратите внимание и поправьте меня.
Весь код в этой статье:
Дополнительный
Недавно я обобщил некоторые знания, связанные с Java, и заинтересованные друзья могут поддерживать их вместе.