предисловие
В этой статье в основном представлена интеграция SpringBoot с Netty и использование Protobuf для передачи данных. Protobuf кратко представит использование, как для Netty впредыдущая статьяОн был кратко представлен в , поэтому здесь я не буду вдаваться в подробности.
Protobuf
вводить
Protocolbuffer (далее PB) — формат обмена данными Google, не зависящий от языка и платформы. Google предоставляет реализации на нескольких языках: java, c#, c++, go и python, каждый из которых включает компиляторы и файлы библиотек для соответствующего языка. Поскольку это двоичный формат, он намного быстрее, чем использование xml для обмена данными. Его можно использовать для обмена данными между распределенными приложениями или обмена данными в гетерогенных средах. Как двоичный формат передачи данных с превосходной эффективностью и совместимостью, он может использоваться во многих областях, таких как передача по сети, файлы конфигурации и хранение данных.
Официальный адрес: https://github.com/google/protobuf
использовать
Использование здесь только вводит использование Java. Сначала нам нужно создатьprotofile, где мы определяем файл, который нам нужно передать. Например, нам нужно определить информацию о пользователе, включая основные поля идентификатора, имени и возраста. затемprotobufФормат файла следующий:Примечание: используется здесьproto3, я уже написал соответствующие заметки, так что не буду здесь вдаваться в подробности. Следует отметить, чтоprotoфайлы и сгенерированныеJavaИмена файлов не могут совпадать!
syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";
message UserMsg {
// ID
int32 id = 1;
// 姓名
string name = 2;
// 年龄
int32 age = 3;
// 状态
int32 state = 4;
}
После создания файла мы помещаем файл иprotoc.exe(программа, генерирующая файлы Java) поместите ее в папку protobuf в каталоге диска E, а затем перейдите в интерфейс dos каталога и введите:protoc.exe --java_out=文件绝对路径名称
.
Например:
protoc.exe --java_out=E:\protobuf User.proto
После ввода нажмите Enter, чтобы увидеть сгенерированный файл Java в каталоге того же уровня, а затем поместите файл в путь, указанный файлом в проекте.
Примечание. Я также интегрировал в этот проект программное обеспечение для работы с файлами protobuf и тестовые файлы protobuf, которые можно получить напрямую.
После того, как файл Java сгенерирован, давайте посмотрим, как его использовать. Здесь я вставлю код напрямую и напишу комментарии в коде, это должно быть легче понять. . .Пример кода:
// 按照定义的数据结构,创建一个对象
UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();
userInfo.setId(1);
userInfo.setName("xuwujing");
userInfo.setAge(18);
UserInfo.UserMsg userMsg = userInfo.build();
// 将数据写到输出流
ByteArrayOutputStream output = new ByteArrayOutputStream();
userMsg.writeTo(output);
// 将数据序列化后发送
byte[] byteArray = output.toByteArray();
// 接收到流并读取
ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
// 反序列化
UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);
System.out.println("id:" + userInfo2.getId());
System.out.println("name:" + userInfo2.getName());
System.out.println("age:" + userInfo2.getAge());
Примечание. Вот небольшое пояснение, потому чтоprotobufОн передается в двоичном виде, поэтому нужно обратить внимание на соответствующую кодировку. также использоватьprotobufТакже нужно обратить внимание на максимальную длину байта следующей передачи.
Выходной результат:
id:1
name:xuwujing
age:18
SpringBoot интегрирует Netty
Примечание. Если вы хотите получить проект напрямую, вы можете сразу перейти к нижней части и загрузить код проекта по ссылке.
подготовка к разработке
Требования к окружающей среде JDK:: 1,8Netty:: 4.0 или выше (кроме 5)Protobuf: 3.0 или выше
Если вы не знакомы с Netty, вы можете ознакомиться с некоторыми из статей, которые я написал ранее. Боже, пожалуйста, не обращайте внимания~. ~ Адрес: https://blog.csdn.net/column/details/17640.html.
Прежде всего, связанные зависимости Maven:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<netty.version>4.1.22.Final</netty.version>
<protobuf.version>3.5.1</protobuf.version>
<springboot>1.5.9.RELEASE</springboot>
<fastjson>1.2.41</fastjson>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>${springboot}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
После добавления соответствующих maven-зависимостей в файл конфигурации добавлять нечего, потому что пока это только прослушивающий порт.
написание кода
Модуль кода в основном делится на серверный и клиентский. В основном реализована бизнес-логика: После успешного запуска сервера клиент также запускается успешно.В это время сервер отправит сообщениеprotobufотформатировать информацию клиенту, а затем клиент дает соответствующий ответ. После того, как соединение между клиентом и сервером будет установлено успешно, клиент будет отправлять на сервер команду пульса каждый период времени, сообщая серверу, что клиент все еще существует.Если клиент не отправляет информацию в указанное время, сервер закроет соединение для этого клиента. Когда клиент не может подключиться к серверу, он время от времени будет пытаться переподключиться, пока переподключение не будет успешным!
Сервер
Первый — написать класс запуска сервера, соответствующие комментарии прописаны в коде очень подробно, поэтому я не буду здесь вдаваться в подробности. Однако следует отметить, что в предыдущей статье Netty, которую я написал, сервер запускался напрямую через метод main, поэтому объект был непосредственно новым. После интеграции со SpringBoot нам нужно передать Netty в SpringBoot для управления, поэтому здесь используются соответствующие аннотации.код показывает, как показано ниже:
@Service("nettyServer")
public class NettyServer {
private static final int port = 9876; // 设置服务端端口
private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
private static ServerBootstrap b = new ServerBootstrap();
@Autowired
private NettyServerFilter nettyServerFilter;
public void run() {
try {
b.group(boss, work);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerFilter); // 设置过滤器
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
System.out.println("服务端启动成功,端口是:" + port);
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭EventLoopGroup,释放掉所有资源包括创建的线程
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
После того, как основной класс сервера написан, зададим соответствующие условия фильтра. Здесь вам нужно наследоваться от NettyChannelInitializerкласс, затем переопределитьinitChannelЭтот метод добавляет соответствующие параметры, такие как параметры времени ожидания пульса, параметры протокола передачи и соответствующие классы бизнес-реализации.код показывает, как показано ниже:
@Component
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", nettyServerHandler);
}
}
После того, как написан код сервисных настроек, давайте напишем основной бизнес-код. Чтобы закодировать бизнес-уровень с помощью Netty, нам нужно наследоватьChannelInboundHandlerAdapterилиSimpleChannelInboundHandlerКласс, кстати, давайте поговорим о разнице между ними. наследоватьSimpleChannelInboundHandlerПосле класса он будет автоматически после получения данныхreleaseданные занятыBytebufferресурс. И для наследования этого класса необходимо указать формат данных. при наследованииChannelInboundHandlerAdapterто он не будет выпущен автоматически, вам нужно вызвать его вручнуюReferenceCountUtil.release()и другие методы освобождения. Наследование этого класса не требует указания формата данных. Итак, здесь я лично рекомендую наследование на стороне сервера.ChannelInboundHandlerAdapter, отпустите его вручную, чтобы данные не были автоматически освобождены до их обработки. Кроме того, к серверу может быть подключено несколько клиентов, и формат данных, запрошенный каждым клиентом, несовместим, и в это время может выполняться соответствующая обработка. Клиент может наследовать в зависимости от ситуацииSimpleChannelInboundHandlerсвоего рода. Преимущество заключается в том, что формат данных для передачи указывается напрямую, и нет необходимости выполнять преобразование формата.
код показывает, как показано ниже:
@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** 空闲次数 */
private int idle_count = 1;
/** 发送次数 */
private int count = 1;
/**
* 建立连接时,发送一条消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
.build();
ctx.writeAndFlush(userMsg);
super.channelActive(ctx);
}
/**
* 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
System.out.println("已经5秒没有接收到客户端的信息了");
if (idle_count > 1) {
System.out.println("关闭这个不活跃的channel");
ctx.channel().close();
}
idle_count++;
}
} else {
super.userEventTriggered(ctx, obj);
}
}
/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
try {
// 如果是protobuf类型的数据
if (msg instanceof UserMsg) {
UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
if (userState.getState() == 1) {
System.out.println("客户端业务处理成功!");
} else if(userState.getState() == 2){
System.out.println("接受到客户端发送的心跳!");
}else{
System.out.println("未知命令!");
}
} else {
System.out.println("未知数据!" + msg);
return;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
count++;
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Также есть класс запуска на стороне сервера, который раньше запускался напрямую через метод main, но здесь изменен на запуск через springBoot, что мало чем отличается.код показывает, как показано ниже:
@SpringBootApplication
public class NettyServerApp {
public static void main(String[] args) {
// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
nettyServer.run();
}
}
На данный момент соответствующий код на стороне сервера написан.
клиент
Код на стороне клиента во многих местах похож на код на стороне сервера, поэтому я не буду вдаваться в подробности, в основном возьму разные коды и кратко опишу их. Первый — это основной класс клиента, который в принципе такой же, как и у сервера, то есть здесь больше прослушивающих портов и прослушиватель (используется для контроля за тем, чтобы отключиться от сервера и переподключиться). Основная логика кода реализации выглядит следующим образом:
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
ChannelFuture f = null;
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(nettyClientFilter);
bootstrap.remoteAddress(host, port);
f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}
});
if(initFalg){
System.out.println("Netty客户端启动成功!");
initFalg=false;
}
// 阻塞
f.channel().closeFuture().sync();
}
} catch (Exception e) {
System.out.println("客户端连接失败!"+e.getMessage());
}
}
Примечание. Реализация слушателя написана в JDK1.8.
Фильтрация на стороне клиента в основном аналогична фильтрации на стороне сервера. Однако следует отметить, что протокол передачи, кодирование и декодирование должны быть согласованы, а время чтения и записи сердцебиения должно быть меньше, чем время, установленное сервером. Модифицированный код выглядит следующим образом:
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
Логика бизнес-кода клиента.
Основная логика реализации заключается в том, что вовремя отправляется сердцебиение и отправляется сервис анализа.protobufформатировать данные.
На стороне сервера имеется более одной аннотации, аннотацияSharableОсновная причина в том, что несколько обработчиков могут безопасно совместно использоваться несколькими каналами, то есть для обеспечения безопасности потоков.
Без лишних слов, код выглядит следующим образом:
@Service("nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
/** 循环次数 */
private int fcount = 1;
/**
* 建立连接时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("建立连接时:" + new Date());
ctx.fireChannelActive();
}
/**
* 关闭连接时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭连接时:" + new Date());
final EventLoop eventLoop = ctx.channel().eventLoop();
nettyClient.doConnect(new Bootstrap(), eventLoop);
super.channelInactive(ctx);
}
/**
* 心跳请求处理 每4秒发送一次心跳请求;
*
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
ctx.channel().writeAndFlush(userState);
fcount++;
}
}
}
/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果不是protobuf类型的数据
if (!(msg instanceof UserMsg)) {
System.out.println("未知数据!" + msg);
return;
}
try {
// 得到protobuf的数据
UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
// 进行相应的业务处理。。。
// 这里就从简了,只是打印而已
System.out.println(
"客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());
// 这里返回一个已经接受到数据的状态
UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
ctx.writeAndFlush(userState);
System.out.println("成功发送给服务端!");
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
Так что здесь также написан клиентский код.
функциональный тест
Сначала запустите сервер, затем запустите клиент. Посмотрим, будет ли результат таким, как указано выше.
Вывод сервера:
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
Результат ввода клиента:
Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4
Из печатной информации видно, что это так, как указано выше.
Далее давайте посмотрим, сможет ли клиент переподключиться. Сначала запустите клиент, затем запустите сервер.
Результат ввода клиента:
Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3
Вывод сервера:
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
Результат также, как указано выше!
разное
Об интеграции SpringBoot Netty использует Protobuf для передачи данных, и на этом все заканчивается. Адрес проекта SpringBoot по интеграции Netty с использованием Protobuf для передачи данных: https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
Кстати, есть еще адреса проектов Netty, которые не используют интеграцию с SpringBoot: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
Оригинал не просто, если вы чувствуете себя хорошо, я надеюсь дать рекомендацию! Ваша поддержка - самая большая мотивация для моего письма! Уведомление об авторских правах: Автор: ничтожество Источник блога сада: http://www.cnblogs.com/xuwujing Источник CSDN: http://blog.csdn.net/qazwsxpcm Источник личного блога: http://www.panchengming.com