В предыдущей статье мы говорили о «3 решения проблемы липких пакетов сокета", но я не ожидал, что область комментариев взорвется. Благодаря всеобщим восторженным обсуждениям и различным отзывам, эта статья будет расширяться и расширяться, пытаясь найти оптимальное решение проблемы и оптимальное решение для обмена сообщениями.
Перед официальным стартом давайте просто ответим на несколько типичных вопросов в предыдущем комментарии, а те, кому неинтересно, могут его сразу пропустить.
Вопрос 1: Есть ли проблема с липкими пакетами в TCP?
Сначала ответьте:Сам TCP не имеет липких пакетов и полупакетов, потому что TCP по сути является просто протоколом управления передачей (TCP), который является ориентированным на соединение, надежным протоколом связи на транспортном уровне на основе потока байтов., как определено в RFC 793 IETF.
Так называемыйСоглашение по сути является договором, точно так же, как в соглашении по программированию Java используется номенклатура верблюжьего регистра. Смысл соглашения состоит в том, чтобы позволить двум сторонам общаться друг с другом для нормального обмена сообщениями. Как возникла проблема с липким пакетом и половинным пакетом?
Это связано с тем, что при TCP-взаимодействии данные передаются в виде байтовых потоков, а передача «потоков» не имеет границ, из-за отсутствия которых невозможно различить атрибуцию сообщений, в результате чего получаются залипшие пакеты и Задача о половинном мешке (определения липкого мешка и половинного мешка см.Предыдущий).Таким образом, сам протокол TCP не имеет проблемы с залипанием пакетов и полупакетов, но если граница потока не может быть эффективно определена при использовании, возникнут проблемы залипания пакетов и полупакетов.
Вопрос 2: Является ли разделитель оптимальным решением?
Честно говоря, после терпеливого "просвещения" всех в комментариях, я тоже понял, что есть определенные ограничения в использовании терминатора в качестве конечного решения, например, если терминатор появляется в середине сообщения, это вызовет проблема половины пакета.Поэтому, если это сложная строка, содержимое должно быть закодировано и декодировано, чтобы обеспечить правильность терминатора.
Вопрос 3: Эффективен ли Socket?
Ответ на этот вопрос - нет. На самом деле сценарий применения был описан в начале: "традиционное программирование Socket". Смысл изучения его заключается в том, чтобы понять некоторые более ранние и низкоуровневые знания. Конечно, в качестве дополнения, эта статья предоставит более эффективное решение для обмена сообщениями - связь Netty.
После обсуждения вышеперечисленных проблем давайте сначала добавим реализацию кода разделения сообщения на заголовок сообщения и тело сообщения, упомянутое в предыдущей статье.
1. Инкапсулируйте заголовок и тело сообщения
Прежде чем приступить к написанию сервера и клиента, давайте напишем класс инкапсуляции сообщения, который можно использовать для инкапсуляции сообщения в заголовок и тело сообщения, как показано на следующем рисунке:
Длина тела сообщения сохраняется в заголовке сообщения, тем самым определяя границу сообщения и решая проблему липких пакетов и половинных пакетов.
1. Класс инкапсуляции сообщений
Класс инкапсуляции сообщения предоставляет два метода: один — преобразовать сообщение в заголовок сообщения + тело сообщения, а другой — прочитать заголовок сообщения.Конкретный код реализации выглядит следующим образом:
/**
* 消息封装类
*/
class SocketPacket {
// 消息头存储的长度(占 8 字节)
static final int HEAD_SIZE = 8;
/**
* 将协议封装为:协议头 + 协议体
* @param context 消息体(String 类型)
* @return byte[]
*/
public byte[] toBytes(String context) {
// 协议体 byte 数组
byte[] bodyByte = context.getBytes();
int bodyByteLength = bodyByte.length;
// 最终封装对象
byte[] result = new byte[HEAD_SIZE + bodyByteLength];
// 借助 NumberFormat 将 int 转换为 byte[]
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(HEAD_SIZE);
numberFormat.setGroupingUsed(false);
// 协议头 byte 数组
byte[] headByte = numberFormat.format(bodyByteLength).getBytes();
// 封装协议头
System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);
// 封装协议体
System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);
return result;
}
/**
* 获取消息头的内容(也就是消息体的长度)
* @param inputStream
* @return
*/
public int getHeader(InputStream inputStream) throws IOException {
int result = 0;
byte[] bytes = new byte[HEAD_SIZE];
inputStream.read(bytes, 0, HEAD_SIZE);
// 得到消息体的字节长度
result = Integer.valueOf(new String(bytes));
return result;
}
}
2. Написать клиент
Далее давайте определим клиент.В клиенте мы добавляем группу сообщений для отправки и случайным образом отправляем сообщение на сервер.Код реализации выглядит следующим образом:
/**
* 客户端
*/
class MySocketClient {
public static void main(String[] args) throws IOException {
// 启动 Socket 并尝试连接服务器
Socket socket = new Socket("127.0.0.1", 9093);
// 发送消息合集(随机发送一条消息)
final String[] message = {"Hi,Java.", "Hi,SQL~", "关注公众号|Java中文社群."};
// 创建协议封装对象
SocketPacket socketPacket = new SocketPacket();
try (OutputStream outputStream = socket.getOutputStream()) {
// 给服务器端发送 10 次消息
for (int i = 0; i < 10; i++) {
// 随机发送一条消息
String msg = message[new Random().nextInt(message.length)];
// 将内容封装为:协议头+协议体
byte[] bytes = socketPacket.toBytes(msg);
// 发送消息
outputStream.write(bytes, 0, bytes.length);
outputStream.flush();
}
}
}
}
3. Напишите серверную часть
На стороне сервера мы используем пул потоков для обработки каждого бизнес-запроса клиента Код реализации выглядит следующим образом:
/**
* 服务器端
*/
class MySocketServer {
public static void main(String[] args) throws IOException {
// 创建 Socket 服务器端
ServerSocket serverSocket = new ServerSocket(9093);
// 获取客户端连接
Socket clientSocket = serverSocket.accept();
// 使用线程池处理更多的客户端
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
threadPool.submit(() -> {
// 客户端消息处理
processMessage(clientSocket);
});
}
/**
* 客户端消息处理
* @param clientSocket
*/
private static void processMessage(Socket clientSocket) {
// Socket 封装对象
SocketPacket socketPacket = new SocketPacket();
// 获取客户端发送的消息对象
try (InputStream inputStream = clientSocket.getInputStream()) {
while (true) {
// 获取消息头(也就是消息体的长度)
int bodyLength = socketPacket.getHeader(inputStream);
// 消息体 byte 数组
byte[] bodyByte = new byte[bodyLength];
// 每次实际读取字节数
int readCount = 0;
// 消息体赋值下标
int bodyIndex = 0;
// 循环接收消息头中定义的长度
while (bodyIndex <= (bodyLength - 1) &&
(readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {
bodyIndex += readCount;
}
bodyIndex = 0;
// 成功接收到客户端的消息并打印
System.out.println("接收到客户端的信息:" + new String(bodyByte));
}
} catch (IOException ioException) {
System.out.println(ioException.getMessage());
}
}
}
Результат выполнения вышеуказанной программы выглядит следующим образом:
Из приведенных выше результатов видно, что передача сообщений проходит нормально, и при взаимодействии между клиентом и сервером не возникает проблем с залипанием пакетов и половинных пакетов.
2. Используйте Netty для эффективного общения
Вышеупомянутое содержимое предназначено для традиционного программирования Socket, но для достижения более эффективной связи и мультиплексирования объектов подключения используется NIO (неблокирующий ввод-вывод, неблокирующий ввод-вывод) или AIO (асинхронный ввод-вывод, асинхронный неблокирующий ввод-вывод).
Традиционное программирование сокетов — это BIO (блокирующий ввод-вывод, синхронный блокирующий ввод-вывод), который отличается от NIO и AIO следующим образом:
- BIO исходит из традиционного пакета java.io, реализован на основе потоковой модели, метод взаимодействия синхронно-блокирующий, то есть при чтении входного или выходного потока поток будет заблокирован до тех пор, пока не будет выполнено чтение и действия записи завершены.Там вызовы между ними идут в надежном линейном порядке. Его преимущество в том, что код относительно прост и интуитивно понятен, а недостаток в том, что эффективность и масштабируемость ввода-вывода очень низки, и он легко может стать узким местом в производительности приложения.
- NIO — это пакет java.nio, представленный в Java 1.4.Он предоставляет новые абстракции, такие как Channel, Selector, Buffer и т. д. Он может создавать мультиплексированные, синхронные неблокирующие программы ввода-вывода и в то же время обеспечивает высокопроизводительные операции с данными, которые ближе к базовой операционной системе.
- AIO — это пакет, представленный после Java 1.7. Это обновленная версия NIO. Он обеспечивает асинхронный неблокирующий режим работы ввода-вывода, поэтому люди называют его AIO (асинхронный ввод-вывод). Асинхронный ввод-вывод реализован на основе событий и механизмов обратного вызова, которые есть операции приложения. После этого он вернется напрямую и не будет там заблокирован. Когда фоновая обработка завершится, операционная система уведомит соответствующий поток для выполнения последующих операций.
PS: AIO можно рассматривать как обновление NIO, которое также называется NIO 2.
Традиционный процесс связи через сокет:Коммуникационный процесс NIO:
Замена традиционного программирования NIO на Netty
Хотя идея дизайна NIO очень хороша, написание его кода более проблематично, например, использование Buffer и написание Selector. А перед лицом сложных проблем, таких как отключение и повторное подключение, потеря пакетов и залипание пакетов, стоимость ручной обработки очень велика, поэтому мы обычно используем фреймворк Netty для замены традиционного NIO.
Что такое Нетти?
Netty — это асинхронная, управляемая событиями платформа для высокопроизводительных и надежных сетевых приложений, с помощью которой можно быстро и легко разрабатывать сетевые приложения, что значительно упрощает сложное сетевое программирование.
Основные преимущества Netty заключаются в следующем:
- Элегантный дизайн фреймворка, а базовую модель можно переключать по желанию для адаптации к различным требованиям сетевых протоколов;
- Обеспечивает поддержку многих стандартных протоколов безопасности, кодирования и декодирования;
- Упрощает многие неудобства в использовании NIO;
- Сообщество очень активно, а фреймворк Netty используется во многих фреймворках с открытым исходным кодом, таких как Dubbo, RocketMQ, Spark и т. д.
Netty в основном включает следующие 3 части, как показано на следующем рисунке:
Функции этих трех частей описываются следующим образом.
1. Основной основной слой
Базовый уровень Core является наиболее важным содержимым Netty.Он обеспечивает общую абстракцию и реализацию базовой сетевой связи, включая расширяемую модель событий, общий API связи и ByteBuf, поддерживающий нулевое копирование.
2. Уровень поддержки протокола
Уровень поддержки протоколов в основном охватывает реализацию кодеков основных протоколов, таких как HTTP, SSL, Protobuf, сжатие, передачу больших файлов, WebSocket, текстовые, двоичные и другие основные протоколы.Кроме того, Netty также поддерживает пользовательские протоколы прикладного уровня. Богатая поддержка протоколов Netty снижает затраты пользователей на разработку.На основе Netty мы можем быстро разрабатывать такие сервисы, как HTTP и WebSocket.
3. Уровень транспортных услуг Transport Service
Уровень транспортных услуг обеспечивает определение и методы реализации транспортных возможностей сети. Он поддерживает Socket, HTTP-туннель, конвейер виртуальных машин и другие методы передачи. Netty абстрагирует и инкапсулирует такие передачи данных, как TCP и UDP. Пользователи могут больше сосредоточиться на реализации бизнес-логики, не беспокоясь о деталях базовой передачи данных.
Нетти использует
Имея общее представление о Netty, мы будем использовать Netty для написания базового коммуникационного сервера, который состоит из двух сторон: стороны сервера и стороны клиента, Сторона клиента отвечает за отправку сообщений, а сторона сервера отвечает за получение и печать сообщений.Конкретные этапы реализации заключаются в следующем.
1. Добавьте фреймворк Netty
Прежде всего, нам нужно добавить поддержку фреймворка Netty.Если это проект Maven, добавьте следующую конфигурацию:
<!-- 添加 Netty 框架 -->
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.56.Final</version>
</dependency>
Примечания к выпуску Нетти
Netty 3.x и 4.x — это основные стабильные версии, а последняя 5.x — заброшенная бета-версия, поэтому рекомендуется использовать последнюю стабильную версию Netty 4.x.
2. Код реализации на стороне сервера
Согласно официальной рекомендации, серверный код делится на следующие три части:
- MyNettyServer: основной бизнес-код на стороне сервера;
- ServerInitializer: Инициализация канала на стороне сервера (Channel);
- ServerHandler: логика обработки после получения сервером информации.
PS: Канал буквально означает "канал", который является носителем сетевого общения. Канал предоставляет базовые API для сетевых операций ввода-вывода, таких как регистрация, привязка, подключение, чтение, запись, сброс и т. д. Канал, реализованный Netty, основан на канале JDK NIO. По сравнению с JDK NIO, канал Netty обеспечивает более высокий уровень абстракции, в то же время защищая сложность базового сокета, предоставляя каналу более мощные функции. В основном нет необходимости иметь дело напрямую с класс Java Socket при использовании Netty.
Код реализации на стороне сервера выглядит следующим образом:
// 定义服务器的端口号
static final int PORT = 8007;
/**
* 服务器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服务端通道初始化
*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具体执行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 服务器端接收到消息之后的业务处理类
*/
static class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到客户端的消息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
/**
* 数据读取完毕
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3. Код реализации клиента
Реализация кода клиента также делится на следующие три части:
- MyNettyClient: основной бизнес-код клиента;
- ClientInitializer: инициализация клиентского канала;
- ClientHandler: логика обработки после получения сообщения.
Код реализации клиента выглядит следующим образом:
/**
* 客户端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", 8007).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
/**
* 客户端通道初始化类
*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客户端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客户端连接成功之后的业务处理
*/
static class ClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到服务器端的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Как видно из приведенного выше кода, функция, реализованная нашим кодом, заключается в том, что клиент отправляет 10 сообщений на сервер.
После написания приведенного выше кода мы можем запустить сервер и клиент, результаты их выполнения после запуска следующие:Из приведенных выше результатов видно, что, хотя клиент и сервер установили связь, все еще существует проблема липких пакетов при использовании Netty, Сервер получает 10 сообщений за раз вместо только одного сообщения за раз. Итак, теперь нам нужно решить проблему с липким пакетом в Netty.
3. Решить проблему липкого пакета Netty
В Netty есть три распространенных решения проблемы липких пакетов:
- Установите длину сообщения фиксированного размера.Если длина недостаточна, используйте нулевой символ, чтобы компенсировать это.Его недостатки более очевидны и потребляют сетевой трафик, поэтому использовать его не рекомендуется;
- Используйте разделители для определения границ сообщений, чтобы избежать проблем с залипанием и половинным пакетом;
- Сообщение разделено на заголовок сообщения и тело сообщения, а длина всего текущего сообщения сохраняется в заголовке.Полное сообщение читается только после того, как будет прочитано сообщение достаточной длины.
Далее мы рассмотрим последние два рекомендуемых решения отдельно.
1. Используйте разделители, чтобы решить проблему липких пакетов
Класс DelimiterBasedFrameDecoder предоставляется в Netty для использования специальных символов в качестве терминатора сообщений для решения проблемы залипания пакетов и половинных пакетов.
Его основной код реализации заключается в разделении сообщений путем установки DelimiterBasedFrameDecoder при инициализации канала (Channel), который необходимо установить как на стороне клиента, так и на стороне сервера.Конкретный код реализации выглядит следующим образом.
Код реализации ядра на стороне сервера выглядит следующим образом:
/**
* 服务端通道初始化
*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具体执行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 19 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
Основной код — строка 19. Смысл метода отмечен в коде, поэтому я не буду повторяться здесь.
Основной код реализации клиента выглядит следующим образом:
/**
* 客户端通道初始化类
*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客户端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 17 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
Полный код реализации на стороне сервера и на стороне клиента выглядит следующим образом:
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyExample {
// 定义服务器的端口号
static final int PORT = 8007;
/**
* 服务器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服务端通道初始化
*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具体执行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 设置结尾分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 服务器端接收到消息之后的业务处理类
*/
static class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到客户端的消息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
/**
* 数据读取完毕
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
/**
* 客户端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
/**
* 客户端通道初始化类
*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客户端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 设置结尾分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客户端连接成功之后的业务处理
*/
static class ClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到服务器端的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
Окончательный результат выполнения показан на следующем рисунке:
Из приведенных выше результатов видно, что Netty можно использовать в обычном режиме, и у него больше нет проблем с липкими пакетами и половинными пакетами.
2. Инкапсулируйте сообщения, чтобы решить проблему липких пакетов
Суть этого решения состоит в том, чтобы разделить сообщение на заголовок сообщения + тело сообщения и сохранить длину тела сообщения в заголовке сообщения, чтобы определить границу сообщения, что позволяет избежать проблемы слипшихся пакетов и половинных пакетов. процесс выглядит следующим образом Как показано на рисунке:
В Netty инкапсуляция сообщений может быть реализована с помощью двух классов: LengthFieldPrepender (кодирование) и LengthFieldBasedFrameDecoder (декодирование). Как и в предыдущем решении, нам нужно решить проблему с липким пакетом, установив каналы (Channel) на стороне сервера и на стороне клиента соответственно.
Основной код на стороне сервера выглядит следующим образом:
/**
* 服务端通道初始化
*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
/**
* 初始化通道的具体执行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 18 行:消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 20 行:消息编码:将消息封装为消息头和消息体,在消息前添加消息体的长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
Основной код — строки 18 и строки 20. Кодирование осуществляется с помощью LengthFieldPrepender (сообщение упаковывается в заголовок сообщения + тело сообщения), а декодирование осуществляется с помощью LengthFieldBasedFrameDecoder (содержимое сообщения вынимается из инкапсулированного сообщения). .
Параметры LengthFieldBasedFrameDecoder описываются следующим образом:
- Параметр 1: maxFrameLength - максимальная длина отправляемого пакета;
- Параметр 2: lengthFieldOffset - смещение поля длины, которое относится к индексу поля длины в байтовом массиве всего пакета данных;
- Параметр 3: lengthFieldLength - длина самого поля длины в байтах;
- Параметр 4: lengthAdjustment — Коррекция смещения для поля длины. Если значение поля длины, помимо длины действительного поля данных, содержит также длину других полей (например, самого поля длины), то его необходимо исправить. Исправленное значение: длина пакета - значение поля длины - смещение поля длины - длина поля длины;
- Параметр 5: initialBytesToStrip — количество начальных байтов, которые нужно отбросить. Количество байтов, предшествующих действительным данным, отбрасывается. Например, есть поле длины с 4 узлами впереди, тогда его значение равно 4.
LengthFieldBasedFrameDecoder(1024,0,4,0,4) означает: максимальная длина пакета данных 1024, поле длины занимает четыре байта заголовка, а первые четыре байта (т.е. поле длины) удаляются при чтении данных .
Основной код реализации клиента выглядит следующим образом:
/**
* 客户端通道初始化类
*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
/**
* 初始化客户端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
Полный код реализации на стороне сервера и на стороне клиента выглядит следующим образом:
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 通过封装 Netty 来解决粘包
*/
public class NettyExample {
// 定义服务器的端口号
static final int PORT = 8007;
/**
* 服务器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new NettyExample.ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服务端通道初始化
*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
/**
* 初始化通道的具体执行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 服务器端接收到消息之后的业务处理类
*/
static class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到客户端的消息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
/**
* 数据读取完毕
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
/**
* 客户端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new NettyExample.ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
/**
* 客户端通道初始化类
*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
/**
* 初始化客户端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客户端连接成功之后的业务处理
*/
static class ClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取到服务器端的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
/**
* 异常处理,打印异常并关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
Результат выполнения вышеуказанной программы:
4. Резюме
В этой статье представлена конкретная реализация кода традиционной связи через сокеты, которая делит сообщения на заголовки сообщений и тела сообщений.Однако традиционные сокеты имеют общую производительность и возможность повторного использования.Для достижения более эффективной связи мы можем использовать инфраструктуру Netty для замены традиционного сокета. и программирование NIO, но у Netty по-прежнему есть проблема с залипанием пакетов при его использовании, поэтому мы предоставляем два наиболее распространенных решения: через разделители или решения, которые будут инкапсулировать сообщения, из которых использование последнего решения более широкое.
Ссылки и благодарности
«Анализ основных принципов Netty и практики RPC»
Подпишитесь на официальный аккаунт «Java Chinese Community», чтобы узнать больше о галантерейных товарах.
Посетите Github, чтобы узнать больше интересных вещей:GitHub.com/VIP камень/Али…