предисловие
В этой статье в основном рассказывается, как интегрировать Netty со Springboot.Поскольку я все еще изучаю Netty, у меня нет опыта применения Netty в реальных производственных проектах, написания и опыта других. Извините за любые повторы.
Что касается того, как SpringBoot интегрируется и использует Netty, я проанализирую и обсужу это в следующих шагах:
- Создайте сервер Netty
- Создайте клиент Netty
- Используйте protobuf для определения формата сообщения
- Обнаружение простоя сервера
- Клиент отправляет пакеты пульса, отключается и снова подключается
PS: Для простоты (в основном ленивой) я помещаю сервер Netty и клиент в один и тот же проект SpringBoot.Конечно, клиент и сервер также могут быть разделены.
Создайте сервер Netty
Код сервера Netty на самом деле относительно прост, он выглядит следующим образом:
@Component
@Slf4j
public class NettyServer {
/**
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/**
* 启动Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("启动 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("关闭Netty");
}
}
Поскольку мы используем Netty в проекте springboot, мы инкапсулируем запуск сервера Netty вstart()
метод и использование@PostConstruct
Аннотация, добавить указанный метод@PostConstruct
Аннотация, указывающая, что метод инициализирован в SpringNettyServer
позвонил после урока.
Принимая во внимание использование механизма сердцебиения и других операций, часть логической цепочки обработки ChannelHandler будет объяснена позже.
Создайте клиент Netty
Код клиента Netty аналогичен серверному, код выглядит следующим образом:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
Вышеизложенное также включает в себя логику отключения и повторного подключения клиента, подробнее будет объяснено ниже.
Построение протоколов связи с protobuf
В процессе интеграции Netty мы используем protobuf от Google для определения формата сообщения.Давайте кратко представим protobuf
Введение в протобуф
Официальное определение protobuf от Google выглядит следующим образом:
Protocol Buffers — это легкий и эффективный формат хранения структурированных данных, который можно использовать для сериализации структурированных данных и который очень подходит для хранения данных или формата обмена данными RPC. Его можно использовать как независимый от языка, платформы и расширяемый формат сериализованных структурированных данных в таких областях, как протоколы связи и хранение данных.
В Netty protobuf часто используется в качестве схемы сериализации.Конечно, protobuf также можно использовать для построения протокола связи между клиентом и сервером.
Зачем использовать протобуф
Здесь мы используем protobuf в качестве метода сериализации, так почему же мы должны использовать protobuf вместо других схем сериализации, таких как сериализация, поставляемая с jdk, Thrift, fastjson и т. д.
Прежде всего, собственный метод сериализации jdk имеет много недостатков, таких как:
- Сериализованный поток кода слишком велик
- производительность слишком низкая
- Не могу скрестить язык
А Google Protobuf является межъязыковым, поддерживает C++, java и python. Тогда сообщение, закодированное protobuf, меньше, что способствует хранению и передаче, и его производительность также очень высока.По сравнению с другими фреймворками сериализации это тоже очень выгодно.Конкретное сравнение различных фреймворков сериализации в Java здесь.Не многое сказать. Короче говоря, Google Protobuf в настоящее время широко используется в различных проектах, и его многочисленные преимущества заставляют нас использовать именно его.
Как использовать протобуф
Для Java основные шаги по использованию protobuf следующие:
- существует
.proto
Формат сообщения определяется в файле - Скомпилировать компилятором protobuf
.proto
файл как класс Java - Пишите или читайте сообщения с помощью protobuf API для Java.
Определить формат протокола protobuf
Вот моя демонстрацияmessage.proto
Например, файл выглядит следующим образом:
//protobuf语法有 proto2和proto3两种,这里指定 proto3
syntax = "proto3";
// 文件选项
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定义
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
enum CommandType {
NORMAL = 0; //常规业务消息
HEARTBEAT_REQUEST = 1; //客户端心跳消息
HEARTBEAT_RESPONSE = 2; //服务端心跳消息
}
}
Интерпретация файла:
- Первая строка в тексте указывает, что он использует
proto3
синтаксис, если он не указан, компилятор использует значение по умолчаниюproto2
синтаксис. Теперь его можно повсеместно использовать в новых проектах.proto3
синтаксис,proto3
Сравниватьproto2
Поддержка большего количества языков, но более краткая. Если вы впервые используете Protobuf в первый раз, вы можете использовать его.proto3
- определение
.proto
При создании файла можно отметить ряд параметров. Некоторые параметры относятся к уровню файла, например, вторая и третья строки выше.java_package
опция file указывает, что компилятор протокола компилирует.proto
Пакет, в котором находятся классы Java, сгенерированные файлом,java_outer_classname
option указывает имя класса Java, который вы хотите сгенерировать -
Message
Конкретный формат сообщения определен в , здесь я определил три поля, каждое поле имеет уникальный цифровой идентификатор, и эти идентификаторы используются для идентификации каждого поля в двоичном формате сообщения. -
Message
также добавляет тип перечисления, который содержит типCommandType
Для всех значений каждый тип перечисления должен отображать свой первый тип в 0, что является значением по умолчанию.
определение модели сообщения
Что касается формата сообщения, то здесь я просто очень-очень просто определяю несколько полей,requestId
Представляет идентификатор сообщения,CommandType
Указывает тип сообщения, которое просто делится на тип сообщения пульса и тип бизнес-сообщения, а затемcontent
Это конкретное содержание сообщения. Определение формата сообщения здесь очень простое, в реальном бою проекта есть много требований к пользовательскому формату сообщения, которые более сложны.
Вышеприведенное кратко представляет некоторые правила грамматики protobuf.Для более подробного ознакомления с грамматикой protobuf обратитесь к официальной документации:Developers.Google.com/protocol - нет...
использовать.proto
компилятор компилирует
Первым шагом является определение формата сообщения protobuf, а затем мы используем.proto
Компилятор файла компилирует определенный нами формат сообщения для создания соответствующего класса Java, чтобы мы могли использовать класс сообщения в проекте.
Я не буду здесь вдаваться в подробности установки компилятора protobuf, подробности смотрите в официальной документации:Developers.Google.com/protocol - нет...
После установки компилятора используйте следующую команду для компиляции.proto
документ:
protoc -I = ./ --java_out=./ ./Message.proto
-
-I
Параметры используются для указания компилируемого.proto
Каталог, в котором расположены файлы определения сообщения, этот параметр также может быть записан как--proto_path
-
--java_out
Опция указывает место хранения после генерации кода Java.Для разных языков наши опции могут отличаться.Например, сгенерированный код C++--cpp_out
- Добавьте файл определения сообщения, который будет скомпилирован после первых двух опций.
Используйте соответствующий API-интерфейс protobuf Java для чтения и записи сообщений.
ранее на основе.proto
Класс Java, созданный файлом определения сообщения, наш код здесь основан наMessage.proto
генерируетсяMessageBase
class, но для нормального использования сгенерированных классов Java нам также необходимо ввести зависимости protobuf-java:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
Каждый класс Java, сгенерированный с помощью protobuf, будет содержать два внутренних класса: Msg и Builder, содержащиеся в Msg (здесь Msg — фактический класс передачи сообщений). конкретно.proto
Каждое сообщение, определенное в, будет генерировать Msg, и каждое Msg соответствует Builder:
- Buidler предоставляет API для классов строительных классов и запросов классов
- Msg предоставляет API запросов, сериализации и десериализации.
Например, мы используем Builder для сборки Msg, пример выглядит следующим образом:
public class MessageBaseTest {
public static void main(String[] args) {
MessageBase.Message message = MessageBase.Message.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setContent("hello world").build();
System.out.println("message: "+message.toString());
}
}
Я не буду здесь рассказывать об использовании protobuf-java API, для получения более подробной информации обратитесь к официальной документации:Developers.Google.com/protocol - нет...
кодек protobuf
Столько всего было сказано выше, формат передачи сообщения определен, но нам также необходимо кодировать и декодировать этот формат protobuf в процессе передачи между клиентом и сервером.Конечно, мы можем настроить кодировку и декодирование сообщения ,protobuf-java
Соответствующие методы сериализации и десериализации предоставляются в API. Хорошей новостью является то, что Netty предоставляет кодеки для protobuf для поддержки protobuf, как показано в следующей таблице (от Netty в действии):
название | описывать |
---|---|
ProtobufDecoder | Расшифровать сообщение с помощью protobuf |
ProtobufEncoder | Кодировать сообщения с помощью protobuf |
ProtobufVarint32FrameDecoder | Динамически разбивает полученный ByteBuf на основе значения целочисленной длины поля Google Protocol Buffers «Base 128 Varint» в сообщении. |
ProtobufVarint32LengthFieldPrepender | Добавить целочисленное значение поля Google Protocol Buffers "Base 128 Varint" в ByteBuf |
С помощью этих кодеков добавьте их в каналPipeline клиента и сервера, чтобы кодировать и декодировать сообщения, следующим образом:
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//空闲检测
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyServerHandler());
}
}
Механизм сердцебиения клиента
Введение в механизм сердцебиения
Heartbeat — это специальный пакет данных, периодически отправляемый между клиентом и сервером в длинном TCP-соединении, уведомляющий другую сторону о том, что она находится в сети, чтобы обеспечить достоверность TCP-соединения.
Как реализовать механизм сердцебиения
Существует два способа реализации механизма сердцебиения:
- Используйте механизм поддержки активности на уровне протокола TCP.
- Пользовательский механизм сердцебиения на прикладном уровне
Механизм поддержки активности на уровне TCP также определен в предыдущей конструкции сервера Netty и в процессе запуска клиента. Нам нужно включить его вручную. Примеры следующие:
// 设置TCP的长连接,默认的 keepalive的心跳时间是两个小时
childOption(ChannelOption.SO_KEEPALIVE, true)
Помимо включения поддержки активности протокола TCP, я изучил некоторые демонстрации с открытым исходным кодом на github и обнаружил, что люди часто настраивают свой собственный механизм пульса и определяют пакеты пульса. И Нетти также предоставляетIdleStateHandlerреализовать механизм сердцебиения
Netty реализует механизм сердцебиения
Давайте посмотрим, как клиент реализует механизм сердцебиения:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经10s没有发送消息给服务端");
//向服务端送心跳包
//这里使用 protobuf定义的消息格式
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
Мы здесь, чтобы создать класс и переопределить ChannelHandleruserEventTriggered
метод, в котором реализована логика отправки пакетов heartbeat, и в то же времяIdleStateEvent
Класс добавляется в логическую цепочку обработки.
На самом деле, когда соединение простаивает слишком долго,IdleStateEvent
событие, то мы вызываемuserEventTriggered
справиться сIdleStateEvent
событие.
После запуска клиента и сервера консоль выводит сообщение пульса следующим образом:
2018-10-28 16:30:46.825 INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler : 已经10s没有发送消息给服务端
2018-10-28 16:30:47.176 INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler : 收到客户端发来的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
Выше мы обсуждали только то, что клиент отправляет сообщения пульса на сервер, так должен ли сервер по-прежнему отправлять сообщения пульса клиенту?
В общем, для длинного соединения одно из решений состоит в том, чтобы отправлять сообщения пульса с обеих сторон, а другое — в том, что сервер действует как пассивный получатель.Если сервер не получает пакет пульса в течение определенного периода времени, он будет напрямую Отключить.
Здесь мы используем вторую схему, нужно только, чтобы клиент отправил сообщение пульса, а затем сервер пассивно получил его, а затем установил период времени.Если сервер не получает никаких сообщений в течение этого периода, он будет активно отключаться, который также Вот что я скажу позжеобнаружение простоя
Клиент Netty отключается и снова подключается
Как правило, есть две ситуации, в которых клиенту Netty необходимо повторно подключиться к серверу:
- При запуске клиента Netty сервер зависает и не может подключиться к серверу
- Во время работы программы сервер внезапно зависает
Реализация первого случаяChannelFutureListener
Он используется для контроля успешности соединения, а в случае неудачи выполняется механизм отключения и повторной попытки.Код выглядит следующим образом:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
ChannelFuture добавляет прослушиватель, если клиент не может подключиться к серверу, вызовитеchannel().eventLoop().schedule()
Метод выполняет логику повтора.
Второй случай, когда сервер внезапно зависает во время запущенного процесса, в этом случае мы реализуем его в обработчике, который обрабатывает чтение и запись данных, код такой:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经10s没有发送消息给服务端");
//向服务端送心跳包
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//如果运行过程中服务端挂了,执行重连机制
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
}
Здесь мы напрямую переписываем его в обработчике, реализующем механизм сердцебиения.channelInactive
метод, а затем выполнить логику повтора в этом методе, который вводится здесьNettyClient
класс, цель состоит в том, чтобы облегчить вызовNettyClient
изstart()
способ переподключения к серверу
channelInactive()
Метод означает, что если текущий Канал не подключен к удаленному узлу, то будет вызван этот метод.
Обнаружение простоя сервера
Что такое обнаружение простоя? На самом деле обнаружение простоя заключается в том, чтобы определить, происходит ли чтение или запись данных в течение этого периода времени. Например, сервер определяет, получил ли он данные, отправленные клиентом в течение определенного периода времени, если нет, вовремя освобождает ресурсы и закрывает соединение.
Для обнаружения бездействия Netty предоставляет специальныеIdleStateHandlerдля достижения этой функции. На приведенный ниже код ссылаются из«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Реализация части обнаружения простоя в:
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/**
* 设置空闲检测时间为 30s
*/
private static final int READER_IDLE_TIME = 30;
public ServerIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME);
ctx.channel().close();
Проверка метода контроллера
Поскольку это демонстрация SpringBoot, интегрирующая Netty, мы создаемController
Метод проверяет связь между сервером Netty и клиентом.Код контроллера выглядит следующим образом, что очень просто:
@RestController
public class ConsumerController {
@Autowired
private NettyClient nettyClient;
@GetMapping("/send")
public String send() {
MessageBase.Message message = new MessageBase.Message()
.toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
.setContent("hello server")
.setRequestId(UUID.randomUUID().toString()).build();
nettyClient.sendMsg(message);
return "send ok";
}
}
инъекцияNettyClient
, назовите егоsendMsg
способ отправки сообщения, результат следующий:
c.p.server.server.NettyServerHandler : 收到客户端的业务消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
резюме
Вышеизложенное подробно описывает, как использовать SpringBoot для интеграции Netty, опираясь на примеры и статьи многих предшественников, что является предварительным пониманием того, как использовать Netty. Если в вышеизложенном есть ошибки, укажите на них. адрес гитхаба:GitHub.com/PJ Mike/Судный день…
Ссылки и благодарности
- Вход и бой с Netty: имитация системы обмена мгновенными сообщениями WeChat IM
- Реализация переподключения Netty Client
- Netty (1) SpringBoot интегрирует механизм пульсации длинных соединений
- Анализ реализации Netty механизма сердцебиения, а также отключения и повторного подключения
- [Switch] Руководство по синтаксису Protobuf3