предисловие
В этой статье в основном рассказывается, как интегрировать Netty со Springboot.Поскольку я все еще изучаю Netty, у меня нет опыта применения Netty в реальных производственных проектах, написания и опыта других. Извините за любые повторы.
Что касается того, как SpringBoot интегрируется и использует Netty, я проанализирую и обсужу это в следующих шагах:
- Создайте сервер Netty
- Создайте клиент Netty
- Используйте protobuf для определения формата сообщения
- Обнаружение простоя сервера
- Клиент отправляет пакеты пульса, отключается и снова подключается
PS: Для простоты (в основном ленивой) я помещаю сервер Netty и клиент в один и тот же проект SpringBoot.Конечно, клиент и сервер также могут быть разделены.
Создайте сервер Netty
Код сервера Netty на самом деле относительно прост, он выглядит следующим образом:
@Component
@Slf4j
public class NettyServer {
/**
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/**
* 启动Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("启动 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("关闭Netty");
}
}
Поскольку мы используем Netty в проекте springboot, мы инкапсулируем запуск сервера Netty вstart()метод и использование@PostConstructАннотация, добавить указанный метод@PostConstructАннотация, указывающая, что метод инициализирован в SpringNettyServerпозвонил после урока.
Принимая во внимание использование механизма сердцебиения и других операций, часть логической цепочки обработки ChannelHandler будет объяснена позже.
Создайте клиент Netty
Код клиента Netty аналогичен серверному, код выглядит следующим образом:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
Вышеизложенное также включает в себя логику отключения и повторного подключения клиента, подробнее будет объяснено ниже.
Построение протоколов связи с protobuf
В процессе интеграции Netty мы используем protobuf от Google для определения формата сообщения.Давайте кратко представим protobuf
Введение в протобуф
Официальное определение protobuf от Google выглядит следующим образом:
Protocol Buffers — это легкий и эффективный формат хранения структурированных данных, который можно использовать для сериализации структурированных данных и который очень подходит для хранения данных или формата обмена данными RPC. Его можно использовать как независимый от языка, платформы и расширяемый формат сериализованных структурированных данных в таких областях, как протоколы связи и хранение данных.
В Netty protobuf часто используется в качестве схемы сериализации.Конечно, protobuf также можно использовать для построения протокола связи между клиентом и сервером.
Зачем использовать протобуф
Здесь мы используем protobuf в качестве метода сериализации, так почему же мы должны использовать protobuf вместо других схем сериализации, таких как сериализация, поставляемая с jdk, Thrift, fastjson и т. д.
Прежде всего, собственный метод сериализации jdk имеет много недостатков, таких как:
- Сериализованный поток кода слишком велик
- производительность слишком низкая
- Не могу скрестить язык
А Google Protobuf является межъязыковым, поддерживает C++, java и python. Тогда сообщение, закодированное protobuf, меньше, что способствует хранению и передаче, и его производительность также очень высока.По сравнению с другими фреймворками сериализации это тоже очень выгодно.Конкретное сравнение различных фреймворков сериализации в Java здесь.Не многое сказать. Короче говоря, Google Protobuf в настоящее время широко используется в различных проектах, и его многочисленные преимущества заставляют нас использовать именно его.
Как использовать протобуф
Для Java основные шаги по использованию protobuf следующие:
- существует
.protoФормат сообщения определяется в файле - Скомпилировать компилятором protobuf
.protoфайл как класс Java - Пишите или читайте сообщения с помощью protobuf API для Java.
Определить формат протокола protobuf
Вот моя демонстрацияmessage.protoНапример, файл выглядит следующим образом:
//protobuf语法有 proto2和proto3两种,这里指定 proto3
syntax = "proto3";
// 文件选项
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定义
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
enum CommandType {
NORMAL = 0; //常规业务消息
HEARTBEAT_REQUEST = 1; //客户端心跳消息
HEARTBEAT_RESPONSE = 2; //服务端心跳消息
}
}
Интерпретация файла:
- Первая строка в тексте указывает, что он использует
proto3синтаксис, если он не указан, компилятор использует значение по умолчаниюproto2синтаксис. Теперь его можно повсеместно использовать в новых проектах.proto3синтаксис,proto3Сравниватьproto2Поддержка большего количества языков, но более краткая. Если вы впервые используете Protobuf в первый раз, вы можете использовать его.proto3 - определение
.protoПри создании файла можно отметить ряд параметров. Некоторые параметры относятся к уровню файла, например, вторая и третья строки выше.java_packageопция file указывает, что компилятор протокола компилирует.protoПакет, в котором находятся классы Java, сгенерированные файлом,java_outer_classnameoption указывает имя класса Java, который вы хотите сгенерировать -
MessageКонкретный формат сообщения определен в , здесь я определил три поля, каждое поле имеет уникальный цифровой идентификатор, и эти идентификаторы используются для идентификации каждого поля в двоичном формате сообщения. -
Messageтакже добавляет тип перечисления, который содержит типCommandTypeДля всех значений каждый тип перечисления должен отображать свой первый тип в 0, что является значением по умолчанию.
определение модели сообщения
Что касается формата сообщения, то здесь я просто очень-очень просто определяю несколько полей,requestIdПредставляет идентификатор сообщения,CommandTypeУказывает тип сообщения, которое просто делится на тип сообщения пульса и тип бизнес-сообщения, а затемcontentЭто конкретное содержание сообщения. Определение формата сообщения здесь очень простое, в реальном бою проекта есть много требований к пользовательскому формату сообщения, которые более сложны.
Вышеприведенное кратко представляет некоторые правила грамматики protobuf.Для более подробного ознакомления с грамматикой protobuf обратитесь к официальной документации:Developers.Google.com/protocol - нет...
использовать.protoкомпилятор компилирует
Первым шагом является определение формата сообщения protobuf, а затем мы используем.protoКомпилятор файла компилирует определенный нами формат сообщения для создания соответствующего класса Java, чтобы мы могли использовать класс сообщения в проекте.
Я не буду здесь вдаваться в подробности установки компилятора protobuf, подробности смотрите в официальной документации:Developers.Google.com/protocol - нет...
После установки компилятора используйте следующую команду для компиляции.protoдокумент:
protoc -I = ./ --java_out=./ ./Message.proto
-
-IПараметры используются для указания компилируемого.protoКаталог, в котором расположены файлы определения сообщения, этот параметр также может быть записан как--proto_path -
--java_outОпция указывает место хранения после генерации кода Java.Для разных языков наши опции могут отличаться.Например, сгенерированный код C++--cpp_out - Добавьте файл определения сообщения, который будет скомпилирован после первых двух опций.
Используйте соответствующий API-интерфейс protobuf Java для чтения и записи сообщений.
ранее на основе.protoКласс Java, созданный файлом определения сообщения, наш код здесь основан наMessage.protoгенерируетсяMessageBaseclass, но для нормального использования сгенерированных классов Java нам также необходимо ввести зависимости protobuf-java:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
Каждый класс Java, сгенерированный с помощью protobuf, будет содержать два внутренних класса: Msg и Builder, содержащиеся в Msg (здесь Msg — фактический класс передачи сообщений). конкретно.protoКаждое сообщение, определенное в, будет генерировать Msg, и каждое Msg соответствует Builder:
- Buidler предоставляет API для классов строительных классов и запросов классов
- Msg предоставляет API запросов, сериализации и десериализации.
Например, мы используем Builder для сборки Msg, пример выглядит следующим образом:
public class MessageBaseTest {
public static void main(String[] args) {
MessageBase.Message message = MessageBase.Message.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setContent("hello world").build();
System.out.println("message: "+message.toString());
}
}
Я не буду здесь рассказывать об использовании protobuf-java API, для получения более подробной информации обратитесь к официальной документации:Developers.Google.com/protocol - нет...
кодек protobuf
Столько всего было сказано выше, формат передачи сообщения определен, но нам также необходимо кодировать и декодировать этот формат protobuf в процессе передачи между клиентом и сервером.Конечно, мы можем настроить кодировку и декодирование сообщения ,protobuf-javaСоответствующие методы сериализации и десериализации предоставляются в API. Хорошей новостью является то, что Netty предоставляет кодеки для protobuf для поддержки protobuf, как показано в следующей таблице (от Netty в действии):
| название | описывать |
|---|---|
| ProtobufDecoder | Расшифровать сообщение с помощью protobuf |
| ProtobufEncoder | Кодировать сообщения с помощью protobuf |
| ProtobufVarint32FrameDecoder | Динамически разбивает полученный ByteBuf на основе значения целочисленной длины поля Google Protocol Buffers «Base 128 Varint» в сообщении. |
| ProtobufVarint32LengthFieldPrepender | Добавить целочисленное значение поля Google Protocol Buffers "Base 128 Varint" в ByteBuf |
С помощью этих кодеков добавьте их в каналPipeline клиента и сервера, чтобы кодировать и декодировать сообщения, следующим образом:
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//空闲检测
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyServerHandler());
}
}
Механизм сердцебиения клиента
Введение в механизм сердцебиения
Heartbeat — это специальный пакет данных, периодически отправляемый между клиентом и сервером в длинном TCP-соединении, уведомляющий другую сторону о том, что она находится в сети, чтобы обеспечить достоверность TCP-соединения.
Как реализовать механизм сердцебиения
Существует два способа реализации механизма сердцебиения:
- Используйте механизм поддержки активности на уровне протокола TCP.
- Пользовательский механизм сердцебиения на прикладном уровне
Механизм поддержки активности на уровне TCP также определен в предыдущей конструкции сервера Netty и в процессе запуска клиента. Нам нужно включить его вручную. Примеры следующие:
// 设置TCP的长连接,默认的 keepalive的心跳时间是两个小时
childOption(ChannelOption.SO_KEEPALIVE, true)
Помимо включения поддержки активности протокола TCP, я изучил некоторые демонстрации с открытым исходным кодом на github и обнаружил, что люди часто настраивают свой собственный механизм пульса и определяют пакеты пульса. И Нетти также предоставляетIdleStateHandlerреализовать механизм сердцебиения
Netty реализует механизм сердцебиения
Давайте посмотрим, как клиент реализует механизм сердцебиения:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经10s没有发送消息给服务端");
//向服务端送心跳包
//这里使用 protobuf定义的消息格式
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
Мы здесь, чтобы создать класс и переопределить ChannelHandleruserEventTriggeredметод, в котором реализована логика отправки пакетов heartbeat, и в то же времяIdleStateEventКласс добавляется в логическую цепочку обработки.
На самом деле, когда соединение простаивает слишком долго,IdleStateEventсобытие, то мы вызываемuserEventTriggeredсправиться сIdleStateEventсобытие.
После запуска клиента и сервера консоль выводит сообщение пульса следующим образом:
2018-10-28 16:30:46.825 INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler : 已经10s没有发送消息给服务端
2018-10-28 16:30:47.176 INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler : 收到客户端发来的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
Выше мы обсуждали только то, что клиент отправляет сообщения пульса на сервер, так должен ли сервер по-прежнему отправлять сообщения пульса клиенту?
В общем, для длинного соединения одно из решений состоит в том, чтобы отправлять сообщения пульса с обеих сторон, а другое — в том, что сервер действует как пассивный получатель.Если сервер не получает пакет пульса в течение определенного периода времени, он будет напрямую Отключить.
Здесь мы используем вторую схему, нужно только, чтобы клиент отправил сообщение пульса, а затем сервер пассивно получил его, а затем установил период времени.Если сервер не получает никаких сообщений в течение этого периода, он будет активно отключаться, который также Вот что я скажу позжеобнаружение простоя
Клиент Netty отключается и снова подключается
Как правило, есть две ситуации, в которых клиенту Netty необходимо повторно подключиться к серверу:
- При запуске клиента Netty сервер зависает и не может подключиться к серверу
- Во время работы программы сервер внезапно зависает
Реализация первого случаяChannelFutureListenerОн используется для контроля успешности соединения, а в случае неудачи выполняется механизм отключения и повторной попытки.Код выглядит следующим образом:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
ChannelFuture добавляет прослушиватель, если клиент не может подключиться к серверу, вызовитеchannel().eventLoop().schedule()Метод выполняет логику повтора.
Второй случай, когда сервер внезапно зависает во время запущенного процесса, в этом случае мы реализуем его в обработчике, который обрабатывает чтение и запись данных, код такой:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经10s没有发送消息给服务端");
//向服务端送心跳包
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//如果运行过程中服务端挂了,执行重连机制
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
}
Здесь мы напрямую переписываем его в обработчике, реализующем механизм сердцебиения.channelInactiveметод, а затем выполнить логику повтора в этом методе, который вводится здесьNettyClientкласс, цель состоит в том, чтобы облегчить вызовNettyClientизstart()способ переподключения к серверу
channelInactive()Метод означает, что если текущий Канал не подключен к удаленному узлу, то будет вызван этот метод.
Обнаружение простоя сервера
Что такое обнаружение простоя? На самом деле обнаружение простоя заключается в том, чтобы определить, происходит ли чтение или запись данных в течение этого периода времени. Например, сервер определяет, получил ли он данные, отправленные клиентом в течение определенного периода времени, если нет, вовремя освобождает ресурсы и закрывает соединение.
Для обнаружения бездействия Netty предоставляет специальныеIdleStateHandlerдля достижения этой функции. На приведенный ниже код ссылаются из«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Реализация части обнаружения простоя в:
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/**
* 设置空闲检测时间为 30s
*/
private static final int READER_IDLE_TIME = 30;
public ServerIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME);
ctx.channel().close();
Проверка метода контроллера
Поскольку это демонстрация SpringBoot, интегрирующая Netty, мы создаемControllerМетод проверяет связь между сервером Netty и клиентом.Код контроллера выглядит следующим образом, что очень просто:
@RestController
public class ConsumerController {
@Autowired
private NettyClient nettyClient;
@GetMapping("/send")
public String send() {
MessageBase.Message message = new MessageBase.Message()
.toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
.setContent("hello server")
.setRequestId(UUID.randomUUID().toString()).build();
nettyClient.sendMsg(message);
return "send ok";
}
}
инъекцияNettyClient, назовите егоsendMsgспособ отправки сообщения, результат следующий:
c.p.server.server.NettyServerHandler : 收到客户端的业务消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
резюме
Вышеизложенное подробно описывает, как использовать SpringBoot для интеграции Netty, опираясь на примеры и статьи многих предшественников, что является предварительным пониманием того, как использовать Netty. Если в вышеизложенном есть ошибки, укажите на них. адрес гитхаба:GitHub.com/PJ Mike/Судный день…
Ссылки и благодарности
- Вход и бой с Netty: имитация системы обмена мгновенными сообщениями WeChat IM
- Реализация переподключения Netty Client
- Netty (1) SpringBoot интегрирует механизм пульсации длинных соединений
- Анализ реализации Netty механизма сердцебиения, а также отключения и повторного подключения
- [Switch] Руководство по синтаксису Protobuf3