Эта статья участвовала в "Проект «Звезда раскопок»”, чтобы выиграть творческий подарочный пакет и бросить вызов творческим поощрительным деньгам.
Преамбула:Ваше первое приложение Netty
В предыдущей статье было написано первое приложение входа Netty.В этой статье в основном анализируется рабочий процесс и основные компоненты Netty из приведенного выше кода в сочетании с блок-схемой этой статьи.
Наконец, мы приведем пример для дальнейшего понимания.
Надеюсь, вы можете получить что-то! ! 🚀
1. Рабочий процесс Netty
Давайте сначала взглянем на диаграмму принципов работы Netty, кратко поговорим о рабочем процессе, а затем проанализируем основные компоненты Netty один за другим с помощью этой диаграммы.
1.1 Схема работы сервера:
1.2, анализ рабочего процесса сервера:
-
Когда запустится серверная часть, привяжите локальный порт и инициализируйте
NioServerSocketChannel
. -
поставить себя
NioServerSocketChannel
зарегистрироваться вBossNioEventLoopGroup
на селекторе.- Серверная часть содержит 1
Boss NioEventLoopGroup
ИWorker NioEventLoopGroup
, -
Boss NioEventLoopGroup
В частности, отвечает за получение соединений от клиентов,Worker NioEventLoopGroup
Отвечает за чтение и запись в сети - NioEventLoopGroup эквивалентна группе циклов событий.Эта группа содержит несколько циклов событий NioEventLoop, и каждый NioEventLoop содержит 1 селектор и 1 поток цикла событий.
- Серверная часть содержит 1
-
BossNioEventLoopGroup
Циклические задачи:1. Опросить событие принятия;
2. Процесс события приема и зарегистрируйте генерируемую ниосокархАНел с определенным
WorkNioEventLoopGroup
на Селекторе.3. Обработайте задачи в очереди задач, запустите AllTasks. Задачи в очереди задач включают вызовы пользователей
eventloop.execute或schedule
Выполненные задачи или другие потоки, отправленные в этотeventloop
задача. -
WorkNioEventLoopGroup
Циклические задачи:-
голосование
read和Write
мероприятие -
Обработка событий ввода-вывода, когда происходят события чтения и записи NioSocketChannel, обрабатывается обратный вызов (триггер) ChannelHandler.
-
Задача, которая обрабатывает очередь задач, т.е.
runAllTasks
-
1.3 Схема работы клиента
Процесс не будет повторяться и описываться 😁
Во-вторых, основные компоненты модуля
Основные компоненты Netty примерно следующие:
- Интерфейс канала
- Интерфейс EventLoopGroup
- Интерфейс ChannelFuture
- Интерфейс ChannelHandler
- Интерфейс ChannelPipeline
- Интерфейс ChannelHandlerContext
- Абстрактный класс SimpleChannelInboundHandler
- Bootstrap, класс ServerBootstrap
- Интерфейс ChannelFuture
- Класс ChannelOption
2.1, интерфейс канала
Обычно мы используем базовые операции ввода-вывода (bind(), connect(), read() и write()), которые существенно зависят от примитивов, предоставляемых базовой сетевой передачей, которые в JavaSocket
Добрый.
API, предоставляемый интерфейсом Netty Channel, значительно сокращает прямое использованиеSocket
сложность класса. Кроме тогоChannel
обеспечить асинхронную сетьI/O
операций (таких как установление соединений, чтение и запись, привязка портов), асинхронные вызовы означают любыеI/O
вызовы будут возвращаться немедленно, и нет гарантии, что запрошенныйI/O
Операция завершена.
возвращает a сразу после завершения вызоваChannelFuture
например, зарегистрировав слушателей наChannelFuture
на, поддержка наI/O
Немедленный обратный вызов, чтобы уведомить вызывающую сторону об успешном завершении операции, сбое или ее отмене.
Кроме того, Channel является корнем широкой иерархии классов со многими предопределенными специализированными реализациями, такими как:
-
LocalServerChannel
: ServerChannel для локального транспорта, позволяющего обмениваться данными с виртуальными машинами. -
EmbeddedChannel
: базовый класс для реализаций канала, используемых во встроенном виде. -
NioSocketChannel
: Асинхронный клиент TCP, подключение через сокет. -
NioServerSocketChannel
: Асинхронный TCP на стороне сервера, соединение через сокет. -
NioDatagramChannel
: Асинхронное соединение UDP. -
NioSctpChannel
: асинхронное соединение Sctp на стороне клиента, использующее неблокирующий режим и позволяющее читать/записывать SctpMessage в базовый SctpChannel. -
NioSctpServerChannel
: асинхронные соединения Sctp на стороне сервера, эти каналы охватывают сетевой ввод-вывод UDP и TCP и файловый ввод-вывод.
2.2, интерфейс EventLoopGroup
EventLoop
Определяет базовую абстракцию Netty для обработки событий, происходящих в течение жизненного цикла соединения.
Netty абстрагирует Selector от приложения, запуская события, устраняя весь код диспетчеризации, который в противном случае нужно было бы писать вручную. Внутри каждому каналу будет назначен EventLoop для обработки всех событий, включая:
- вопрос регистрации;
- Отправка событий в ChannelHandler;
- Запланируйте дальнейшие действия.
Однако мы не будем здесь углубляться в это, а сделаем краткое описание связи между Channel, EventLoop, Thread и EventLoopGroup.
- Один
EventLoopGroup
содержит один или несколькоEventLoop
; - каждый
EventLoop
Поддерживает аSelector
Например, EventLoop — только один в своем жизненном цикле.Thread
связывать; - Поэтому все по
EventLoop
Все обрабатываемые события ввода-вывода будутThread
обрабатывается выше, практически исключая необходимость синхронизации; - Один
Channel
зарегистрированы только в одном во время его жизниEventLoop
; - Один
EventLoop
может быть назначен один или несколькоChannel
. - Обычно служебный порт представляет собой
ServerSocketChannel
соответствует одномуSelector
с однимEventLoop
нить.BossEventLoop
Отвечает за прием клиентских подключений иSocketChannel
сдаватьWorkerEventLoopGroup
Чтобы выполнить обработку ввода-вывода, как показано на блок-схеме выше.
2.3, интерфейс ChannelFuture
Все операции ввода/вывода в Netty являются асинхронными. Поскольку операция может вернуться не сразу, нам нужен способ определить ее результат в более поздний момент времени. Конкретная реализация осуществляется черезFuture
а такжеChannelFutures
,ЭтоaddListener()
Метод зарегистрировалChannelFutureListener
, чтобы зарегистрированные события прослушивателя запускались автоматически после завершения операции (независимо от того, успешно она или нет).
Общие методы
-
Channel channel()
, возвращает текущий прогрессIO
рабочий канал -
ChannelFuture sync()
, дождитесь завершения асинхронной операции
2.4, интерфейс ChannelHandler
Из предыдущей вводной процедуры мы можем видетьChannelHandler
Важность в Netty заключается в том, что он действует как контейнер для всей логики приложения, которая обрабатывает входящие и исходящие данные. Большая часть нашей бизнес-логики также написана в реализованном классе слов, вдобавокChannelHandler
Метод автоматически запускается событием, и его не нужно отправлять нам.
ChannelHandler
Существует множество классов реализации или подинтерфейсов. Обычно мы просто идем к наследованию или подинтерфейсу, а потом переписываем методы внутри.
Наиболее распространенные типы обработчиков:
-
ChannelInboundHandler
: получать входящие события и данные -
ChannelOutboundHandler
: Для обработки исходящих событий и данных.
Общие адаптеры:
-
ChannelInboundHandlerAdapter
: для обработки входящих событий ввода-выводаChannelInboundHandler
Абстрактный базовый класс для реализаций, предоставляющий реализации для всех методов. Эта реализация просто перенаправляет операциюChannelPipeline
следующийChannelHandler
. Подклассы могут переопределить реализацию метода, чтобы изменить это. -
ChannelOutboundHandlerAdapter
: используется для обработки исходящих событий ввода-вывода
Нам часто нужно настроить одинHandler
класс для наследованияChannelInboundHandlerAdapter
, а затем реализовать бизнес-логику, переопределив соответствующие методы, посмотрим, какие методы можно переопределить:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
//注册事件
public void channelRegistered(ChannelHandlerContext ctx) ;
//
public void channelUnregistered(ChannelHandlerContext ctx);
//通道已经就绪
public void channelActive(ChannelHandlerContext ctx);
public void channelInactive(ChannelHandlerContext ctx) ;
//通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) ;
//通道读取数据事件完毕
public void channelReadComplete(ChannelHandlerContext ctx) ;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt);
//通道可写性已更改
public void channelWritabilityChanged(ChannelHandlerContext ctx);
//异常处理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
}
2.5, интерфейс ChannelPipeline
ChannelPipeline
Предоставляет контейнер для цепочки ChannelHandlers и определяет API для распространения потока входящих и исходящих событий в цепочке. При создании канала он автоматически назначается своему собственномуChannelPipeline
. Соотношение их состава следующее:
ОдинChannelсодержитChannelPipeline
,а такжеChannelPipelineв другом, поддерживаемомChannelHandlerContext
состоит из двусвязных списков, и каждыйChanneHandlerContextсвязанный с другимChannelHandler
.
ChannelHandler
установить наChannelPipeline
В процессе:
- Один
ChannelInitializer
Реализация зарегистрирована наServerBootstrap
середина ; - когда
ChannelInitializer.initChannel()
При вызове методаChannelInitializerбудетChannelPipeline
При монтаже комплекта нестандартныхChannelHandler
; -
ChannelInitializer
удалить себя изChannelPipeline
удалено в.
С точки зрения клиентского приложения события называются исходящими, если их движение направлено от клиента к серверу, и входящими, если это не так. На стороне сервера все наоборот.
Если сообщение или любое другое входящее событие прочитано, оно будетChannelPipeline
Заголовок начинает течь и передается первомуChannelInboundHandler
. После обработки этого обработчика данные будут переданы следующему в цепочкеChannelInboundHandler
. В конце концов, данные придутChannelPipeline
На этом вся обработка закончена.
Исходящие события доставляются из хвоста к первому обработчику исходящих сообщений. Исходящие и входящие обработчики не мешают друг другу.
2.6, интерфейс ChannelHandlerContext
эффект должен сделатьChannelHandler
способенChannelPipeline
И другие процессы взаимодействуют. Поскольку ChannelHandlerContext сохраняетchannel
Вся соответствующая контекстная информация, а также корреляцияChannelHandler
объект, кроме того,ChannelHandlerContext
может уведомитьChannelPipeline
следующийChannelHandler
а также динамическое изменениеChannelPipeline
.
2.7, абстрактный класс SimpleChannelInboundHandler
Мы часто можем столкнуться с приложениями, которые воспользуютсяChannelHandler
Чтобы получить расшифрованное сообщение и реализовать бизнес-логику в этом обработчике, напишите такойChannelHandler
, нам просто нужно расширить абстрактный класс SimpleChannelInboundHandler< T >
То есть, где тип T — это тип Java сообщения, которое мы хотим обработать.
существуетSimpleChannelInboundHandler
Наиболее важным методом являетсяvoid channelRead0(ChannelHandlerContext ctx, T msg)
,
После того, как мы сами реализуем этот метод, полученное сообщение было декодировано.
Например:
2.8, Bootstrap, класс ServerBootstrap
Bootstrap
значит направлять, т.Netty
Приложения обычно состоят изBootstrap
В начале основная роль заключается в настройке всегоNetty
программа, объединяющая отдельные компоненты.
категория | Bootstrap | ServerBootstrap |
---|---|---|
гид | для загрузки клиента | Используется для управления сервером |
роль в сетевом программировании | Используется для подключения к удаленным хостам и портам | Используется для привязки к локальному порту |
Количество групп EventLoopGroups | 1 | 2 |
Думаю у всех могут возникнуть сомнения по поводу последнего пункта, почему у одного 1, а у другого 2?
Поскольку серверу нужны два разных набораChannel
.
Первая группа будет содержать только одинServerChannel
, который представляет собственный прослушивающий сокет сервера, привязанный к некоторому локальному порту.
А вторая группа будет содержать все созданные для обработки входящих клиентских подключений (по одному на каждое принятое сервером подключение)Channel
.
Это можно увидеть на блок-схеме выше.
2.9, интерфейс ChannelFuture
Результат асинхронной операции ввода/вывода канала. Все операции ввода/вывода в Netty являются асинхронными. Это означает, что любой вызов ввода-вывода будет возвращен немедленно, но нет гарантии, что запрошенная операция ввода-вывода будет завершена к моменту завершения вызова. Вместо этого вы возвращаете экземпляр ChannelFuture, который предоставляет вам информацию о результате или состоянии операции ввода-вывода. ChannelFuture либо не завершен, либо завершен. Когда начинается операция ввода-вывода, создается новый объект будущего. Новое будущее изначально незавершенное — оно ни успешно, ни неудачно, ни отменено, потому что операция ввода-вывода еще не завершена. Если операция ввода-вывода завершена успешно, неудачно или отменена, будущая операция помечается как завершенная с более конкретной информацией, такой как причина сбоя. Обратите внимание, что даже отказы и отмены считаются состояниями завершения.
Netty
все вIO
Все операции асинхронны, и невозможно сразу узнать, правильно ли было обработано сообщение. Но вы можете подождать, пока он завершит выполнение через некоторое время, или зарегистрировать слушателя напрямую.Future
а такжеChannelFutures
, они могут зарегистрировать прослушиватель, и прослушиватель автоматически вызовет зарегистрированное событие прослушивателя, когда операция завершится успешно или неудачно.
Общие методы
-
Channel channel()
, возвращает текущий прогрессIO
рабочий канал -
ChannelFuture sync()
, дождитесь завершения асинхронной операции
2.10, класс ChannelOption
-
Netty
при созданииChannel
После экземпляра обычно необходимо установитьChannelOption
параметр. -
ChannelOption
Параметры следующие:-
ChannelOption.SO_KEEPALIVE
: Всегда оставайтесь на связи -
ChannelOption.SO_BACKLOG
: Соответствует параметру невыполненной работы в функции прослушивания протокола TCP/IP, который используется для инициализации размера очереди, доступной для подключения к серверу. Сторона сервера обрабатывает запросы на подключение клиента в последовательной обработке, поэтому запрос помещается в очередь для ожидания обработки.Когда параметр backilog указывает, что терминал прибывает, сторона сервера помещает запрос на подключение клиента, который не может быть обработан, в очередь для обработки, отставание Параметр указывает размер очереди.
-
3. Примеры применения
【Случай】:
Напишите сервер, два или более клиентов, клиент может общаться друг с другом.
3.1. Обработчик сервера
ChannelHandler
Существует множество классов реализации или подинтерфейсов. Обычно мы просто идем к наследованию или подинтерфейсу, а потом переписываем методы внутри.
Здесь мы просто унаследовали SimpleChannelInboundHandler , многие из методов здесь в основном до тех пор, пока мы переписываем бизнес-логику, и большинство триггеров вызываются автоматически при возникновении события, без необходимости вызывать их вручную.
package com.crush.atguigu.group_chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author crush
*/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个channle 组,管理所有的channel
* GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* handlerAdded 表示连接建立,一旦连接,第一个被执行
* 将当前channel 加入到 channelGroup
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其它在线的客户端
/*
该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
我们不需要自己遍历
*/
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
/**
* 断开连接, 将xx客户离开信息推送给当前在线的客户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
/**
* 表示channel 处于活动状态, 既刚出生 提示 xx上线
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
/**
* 表示channel 处于不活动状态, 既死亡状态 提示 xx离线了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了~");
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
if (channel != ch) { //不是当前的channel,转发消息
ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
} else {//回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
3.2 Запуск сервера сервера
package com.crush.atguigu.group_chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author crush
*/
public class GroupChatServer {
/**
* //监听端口
*/
private int port;
public GroupChatServer(int port) {
this.port = port;
}
/**
* 编写run方法 处理请求
* @throws Exception
*/
public void run() throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//8个NioEventLoop
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
3.3. Обработчик клиента
package com.crush.atguigu.group_chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author crush
*/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
//当前Channel 已从对方读取消息时调用。
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
3.4. Клиент-сервер
package com.crush.atguigu.group_chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* @author crush
*/
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相关handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定义的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
//客户端需要输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
Несколько клиентов, просто cv один раз.
3.5 Тест:
Процесс тестирования состоит в том, чтобы сначала запустить сервер сервера, а затем запустить клиент.
4. Поговорите с собой
Эту статью следует считать рукописью, раньше я был занят другими вещами.
Это все для сегодняшней статьи.
привет я блоггер
宁在春
:ДомаЕсли у вас есть какие-либо сомнения в статье, оставьте сообщение или личное сообщение или добавьте контактную информацию на главную страницу, и мы ответим как можно скорее.
Если вы обнаружите какие-либо проблемы в статье, надеюсь, вы меня поправите, большое спасибо.
Если вы считаете это полезным, пожалуйста, ставьте палец вверх и уходите!
Приглашаем всех обсудить в дискуссионной зоне, увеличить свою удачу и взглянуть на периферийные устройства, официально присланные Nuggets! ! !
Чем больше вы оставите комментарий, тем больше шансов выиграть приз! ! !
Подробности 👉Проект «Звезда раскопок»