ПредыдущийЗнакомит с основными понятиями мониторинга событий, моделью цепочки ответственности, интерфейсом сокетов и моделью ввода-вывода, моделью потоков и т. д., а также с общей структурой Netty.В этой статье речь пойдет об одном из трех основных модулей Netty: мониторинге событий и обработка.
Как упоминалось ранее, Netty — это инфраструктура NIO, которая абстрагирует изменения состояния, такие как создание, доступность для чтения и записи каналов ввода-вывода, в события и передает их в виде цепочки ответственности Интересующие события отслеживаются и обрабатываются.
Через введение вы узнаете:
- Модель прослушивания и обработки событий
- Прослушиватель событий: EventLoop
- Обработка событий: ChannelPipeline и ChannelHandler
- Реализация протокола Websocket с использованием Netty
В конце статьи есть преимущества ~
Модель прослушивания и обработки событий
При выполнении сетевого программирования общий процесс написания выглядит следующим образом:
- Создайте серверный сокет и прослушайте порт;
- При наличии клиентского соединения будет создан новый клиентский сокет для отслеживания статуса данных, доступных для чтения и записи, и клиентский сокет будет создан для каждого запроса на соединение;
- Чтение и запись данных будет вызывать интерфейс, предоставляемый Socket.Список интерфейсов упоминался в предыдущей статье;
В традиционной модели каждый клиентский сокет будет создавать отдельный поток для мониторинга событий сокета.С одной стороны, система может создавать ограниченное количество потоков, что ограничивает количество параллелизма.С другой стороны, потоков слишком много и частое переключение потоков, что приводит к серьезному снижению производительности.
С развитием модели ввода-вывода операционной системы можно использовать мультиплексный ввод-вывод, и один поток отслеживает несколько сокетов, кроме того, сервер обрабатывает клиентское соединение и отслеживает клиентский сокет, который может обрабатываться в разных потоках.
Netty использует мультиплексный ввод-вывод для мониторинга событий, кроме того, различные потоки используются для обработки клиентских подключений, чтения и записи данных.
Вся структура обработки показана на рисунке ниже с кратким пояснением:
- Boss EventLoopGroup в основном обрабатывает событие подключения клиента, включая несколько циклов событий, по одному потоку на цикл событий;
- Worker EventLoopGroup в основном обрабатывает события чтения и записи данных клиентского сокета, включая несколько циклов событий, и каждый цикл событий имеет поток;
- Будь то Boos или Worker, обработка событий организована через Channel Pipeline, который является реализацией шаблона цепочки ответственности, включая одного или нескольких обработчиков;
- Прослушивание порта будет привязано только к одному циклу событий в Boss EventLoopGroup;
- Eventloop в Worker EventLoopGroup может отслеживать несколько клиентских сокетов;
EventLoop
EventLoop фактически привязан к определенному потоку, и связанный поток не изменится в течение своего жизненного цикла.
EventLoop выполняет две задачи:
- Первый — это поток ввода-вывода для выполнения операций ввода-вывода, связанных с каналом, включая вызов select для ожидания готовых событий ввода-вывода, чтения и записи данных, обработки данных и т. д.;
- Вторая задача используется как очередь задач для выполнения задач в taskQueue.Например, запланированная задача, отправленная пользователем, вызывающим eventLoop.schedule, также выполняется этим потоком;
Первая задача проста для понимания, а вторая в основном объясняется: от данных сокета до обработки данных, до записи данных ответа, Netty обрабатывается в одном потоке, в основном из соображений безопасности потоков, снижения конкуренции и переключения потоков, Через задачу очередь, логика обработки может быть отправлена в пользовательский поток и выполнена в цикле событий.
Что делает весь EventLoop, так это select -> processIO -> runAllTask, processIO обрабатывает логику, связанную с событиями ввода-вывода, а runAllTask обрабатывает задачи в очереди задач.Если выполняется слишком много задач, это повлияет на обработку событий ввода-вывода , так что это ограничит обработку задачи.Время, весь процесс выглядит следующим образом:
Код запуска EventLoop выглядит следующим образом:
protected void run() {
for (; ; ) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) { //如果有任务,快速返回
selectNow();
} else {
select(); //如果没任务,等待事件返回
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
//处理IO事件
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
//计算IO处理时间
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio; //默认为50
//处理提交的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
ChannelPipeline и ChannelHandler
ChannelPipeline — это интерфейс, который имеет класс реализации по умолчанию DefaultChannelPipeline с двумя свойствами: head и tail, Оба реализуют интерфейс ChannelHandler, соответствующий началу и концу цепочки обработки.
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
При создании каждого канала создается объект ChannelPipeline для обработки различных событий канала, а ChannelHandler может динамически изменяться во время выполнения.
Там, где ChannelHandler несет логику бизнес-обработки, мы больше всего контактируем с классом, вы можете настроить Handler, добавить его в цепочку обработки и реализовать пользовательскую логику.
ChannelHandler можно разделить на две категории: ChannelInboundHandler и ChannelOutboundHandle, которые соответствуют обработке входящих и исходящих сообщений соответственно для чтения и записи данных. Они предоставляют методы интерфейса, которые мы можем реализовать для обработки различных событий:
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}
При настройке Handler он обычно наследует ChannelInboundHandlerAdapter или ChannelOutboundHandlerAdapter.
Следует отметить, что не рекомендуется напрямую реализовывать трудоемкие или блокирующие операции в ChannelHandler, так как это может заблокировать рабочий поток Netty, в результате чего Netty не сможет своевременно реагировать на обработку ввода-вывода.
Реализация протокола Websocket с использованием Netty
Протокол веб-сокета
Это не является предметом этой статьи, просто кратко объясните:
- Это длинный протокол соединения, который поддерживается большинством браузеров.Через websocket сервер может активно отправлять сообщения клиенту;
- Протокол Websocket использует протокол HTTP на этапе рукопожатия, после завершения рукопожатия используется протокол Websocket;
- Websocket — это бинарный протокол;
инициализация
Netty предоставляет нам класс ChannelInitializer для инициализации, создания класса WebSocketServerInitializer, наследования класса ChannelInitializer и добавления ChannelHandler:
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Resource
private CustomTextFrameHandler customTextFrameHandler;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
pipeline.addLast("custome-handler", customTextFrameHandler);
}
}
Проанализируйте эти обработчики, которые Netty предоставляет по умолчанию:
- HttpServerCodec: используется для разбора Http-запросов, в основном на этапе рукопожатия;
- HttpObjectAggregator: используется для объединения заголовков Http-запросов и тел запросов, которые в основном обрабатываются на этапе рукопожатия;
- WebSocketServerProtocolHandler: обрабатывает протокол Websocket;
- CustomTextFrameHandler: пользовательский обработчик для добавления собственной бизнес-логики.
Не правда ли, очень удобно, после обработки WebSocketServerProtocolHandler текстовые данные считываются, так что вам не нужно заниматься проблемами перепаковки и распаковки данных самостоятельно.
CustomTextFrameHandler
Пользовательский обработчик для бизнес-обработки:
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
final String content = frame.text();
System.out.println("接收到数据:"+content);
// 回复数据
TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的数据");
if (ctx.channel().isWritable()) {
ChannelFuture future = ctx.writeAndFlush(respFrame);
}
}
}
Преимущество Описание
Наконец, давайте поговорим о преимуществах: динамик Xiaoai F-код.
Я подготовил 2 копии, в основном, чтобы поблагодарить друзей «Официального аккаунта WeChat» и «Сообщества Nuggets».Каждая копия включает 1 динамик Xiaoai размера F и 1 динамик Xiaoai mini размера F.
Код F для мобильного телефона Xiaomi происходит от английского слова «Друг». Это право преимущественной покупки, предоставляемое Xiaomi основным пользователям Xiaomi и пользователям сети, которые внесли свой вклад в Xiaomi. Если у вас есть код Xiaomi F, вы можете использовать Xiaomi F Код напрямую без ожидания Покупайте сопутствующие товары!
Проще говоря, F-код означает, что вам не нужно его брать, вы можете купить его напрямую~
Крайний срок розыгрыша
9 апреля в 12.00
Правила розыгрыша
Сообщество самородков
-
Вы должны подписаться на мою учетную запись Nuggets, чтобы быть эффективными.Домашняя страница;
-
Используйте лотерейный помощник WeChat, чтобы случайным образом разыграть розыгрыш для сообщества Nuggets;
Публичный аккаунт WeChat
-
Вам нужно обратить внимание на мой публичный аккаунт WeChat, чтобы быть эффективным;
-
Используйте помощника по лотерее WeChat, чтобы случайным образом выбрать общедоступную учетную запись WeChat;