Мониторинг и обработка событий Netty (ниже) [с преимуществами]

задняя часть Netty

ПредыдущийЗнакомит с основными понятиями мониторинга событий, моделью цепочки ответственности, интерфейсом сокетов и моделью ввода-вывода, моделью потоков и т. д., а также с общей структурой Netty.В этой статье речь пойдет об одном из трех основных модулей Netty: мониторинге событий и обработка.

Как упоминалось ранее, Netty — это инфраструктура NIO, которая абстрагирует изменения состояния, такие как создание, доступность для чтения и записи каналов ввода-вывода, в события и передает их в виде цепочки ответственности Интересующие события отслеживаются и обрабатываются.

Через введение вы узнаете:

  • Модель прослушивания и обработки событий
  • Прослушиватель событий: EventLoop
  • Обработка событий: ChannelPipeline и ChannelHandler
  • Реализация протокола Websocket с использованием Netty

В конце статьи есть преимущества ~

Модель прослушивания и обработки событий

При выполнении сетевого программирования общий процесс написания выглядит следующим образом:

  • Создайте серверный сокет и прослушайте порт;
  • При наличии клиентского соединения будет создан новый клиентский сокет для отслеживания статуса данных, доступных для чтения и записи, и клиентский сокет будет создан для каждого запроса на соединение;
  • Чтение и запись данных будет вызывать интерфейс, предоставляемый Socket.Список интерфейсов упоминался в предыдущей статье;

В традиционной модели каждый клиентский сокет будет создавать отдельный поток для мониторинга событий сокета.С одной стороны, система может создавать ограниченное количество потоков, что ограничивает количество параллелизма.С другой стороны, потоков слишком много и частое переключение потоков, что приводит к серьезному снижению производительности.

С развитием модели ввода-вывода операционной системы можно использовать мультиплексный ввод-вывод, и один поток отслеживает несколько сокетов, кроме того, сервер обрабатывает клиентское соединение и отслеживает клиентский сокет, который может обрабатываться в разных потоках.

Netty использует мультиплексный ввод-вывод для мониторинга событий, кроме того, различные потоки используются для обработки клиентских подключений, чтения и записи данных.

Вся структура обработки показана на рисунке ниже с кратким пояснением:

  • Boss EventLoopGroup в основном обрабатывает событие подключения клиента, включая несколько циклов событий, по одному потоку на цикл событий;
  • Worker EventLoopGroup в основном обрабатывает события чтения и записи данных клиентского сокета, включая несколько циклов событий, и каждый цикл событий имеет поток;
  • Будь то Boos или Worker, обработка событий организована через Channel Pipeline, который является реализацией шаблона цепочки ответственности, включая одного или нескольких обработчиков;
  • Прослушивание порта будет привязано только к одному циклу событий в Boss EventLoopGroup;
  • Eventloop в Worker EventLoopGroup может отслеживать несколько клиентских сокетов;

事件监听和处理模型

EventLoop

EventLoop фактически привязан к определенному потоку, и связанный поток не изменится в течение своего жизненного цикла.

EventLoop выполняет две задачи:

  • Первый — это поток ввода-вывода для выполнения операций ввода-вывода, связанных с каналом, включая вызов select для ожидания готовых событий ввода-вывода, чтения и записи данных, обработки данных и т. д.;
  • Вторая задача используется как очередь задач для выполнения задач в taskQueue.Например, запланированная задача, отправленная пользователем, вызывающим eventLoop.schedule, также выполняется этим потоком;

Первая задача проста для понимания, а вторая в основном объясняется: от данных сокета до обработки данных, до записи данных ответа, Netty обрабатывается в одном потоке, в основном из соображений безопасности потоков, снижения конкуренции и переключения потоков, Через задачу очередь, логика обработки может быть отправлена ​​в пользовательский поток и выполнена в цикле событий.

Что делает весь EventLoop, так это select -> processIO -> runAllTask, processIO обрабатывает логику, связанную с событиями ввода-вывода, а runAllTask ​​обрабатывает задачи в очереди задач.Если выполняется слишком много задач, это повлияет на обработку событий ввода-вывода , так что это ограничит обработку задачи.Время, весь процесс выглядит следующим образом:

EventLoop处理过程

Код запуска EventLoop выглядит следующим образом:

protected void run() {
     for (; ; ) {
         oldWakenUp = wakenUp.getAndSet(false);
         try {
             if (hasTasks()) { //如果有任务,快速返回
                 selectNow();
             } else {
                 select(); //如果没任务,等待事件返回
                 if (wakenUp.get()) {
                     selector.wakeup();
                 }
             }
             cancelledKeys = 0;
             final long ioStartTime = System.nanoTime();
             needsToSelectAgain = false;

             //处理IO事件
             if (selectedKeys != null) {
                 processSelectedKeysOptimized(selectedKeys.flip());
             } else {
                 processSelectedKeysPlain(selector.selectedKeys());
             }

             //计算IO处理时间
             final long ioTime = System.nanoTime() - ioStartTime;
             final int ioRatio = this.ioRatio; //默认为50

             //处理提交的任务
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

             if (isShuttingDown()) {
                 closeAll();
                 if (confirmShutdown()) {
                     break;
                 }
             }
         } catch (Throwable t) {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
             }
         }
     }
 }

ChannelPipeline и ChannelHandler

ChannelPipeline — это интерфейс, который имеет класс реализации по умолчанию DefaultChannelPipeline с двумя свойствами: head и tail, Оба реализуют интерфейс ChannelHandler, соответствующий началу и концу цепочки обработки.

 protected DefaultChannelPipeline(Channel channel) {
     this.channel = ObjectUtil.checkNotNull(channel, "channel");
     succeededFuture = new SucceededChannelFuture(channel, null);
     voidPromise =  new VoidChannelPromise(channel, true);

     tail = new TailContext(this);
     head = new HeadContext(this);

     head.next = tail;
     tail.prev = head;
}

При создании каждого канала создается объект ChannelPipeline для обработки различных событий канала, а ChannelHandler может динамически изменяться во время выполнения.

Там, где ChannelHandler несет логику бизнес-обработки, мы больше всего контактируем с классом, вы можете настроить Handler, добавить его в цепочку обработки и реализовать пользовательскую логику.

ChannelHandler можно разделить на две категории: ChannelInboundHandler и ChannelOutboundHandle, которые соответствуют обработке входящих и исходящих сообщений соответственно для чтения и записи данных. Они предоставляют методы интерфейса, которые мы можем реализовать для обработки различных событий:

public interface ChannelInboundHandler extends ChannelHandler {
	void channelRegistered(ChannelHandlerContext ctx) throws Exception;
	void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
	void channelActive(ChannelHandlerContext ctx) throws Exception;
	void channelInactive(ChannelHandlerContext ctx) throws Exception;
	void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
	void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
	void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
	void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}

При настройке Handler он обычно наследует ChannelInboundHandlerAdapter или ChannelOutboundHandlerAdapter.

Следует отметить, что не рекомендуется напрямую реализовывать трудоемкие или блокирующие операции в ChannelHandler, так как это может заблокировать рабочий поток Netty, в результате чего Netty не сможет своевременно реагировать на обработку ввода-вывода.

ChannelPipline

Реализация протокола Websocket с использованием Netty

Протокол веб-сокета

Это не является предметом этой статьи, просто кратко объясните:

  • Это длинный протокол соединения, который поддерживается большинством браузеров.Через websocket сервер может активно отправлять сообщения клиенту;
  • Протокол Websocket использует протокол HTTP на этапе рукопожатия, после завершения рукопожатия используется протокол Websocket;
  • Websocket — это бинарный протокол;
инициализация

Netty предоставляет нам класс ChannelInitializer для инициализации, создания класса WebSocketServerInitializer, наследования класса ChannelInitializer и добавления ChannelHandler:

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

	@Resource
	private CustomTextFrameHandler customTextFrameHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("codec-http", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        
        pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
        pipeline.addLast("custome-handler", customTextFrameHandler);
    }
}

Проанализируйте эти обработчики, которые Netty предоставляет по умолчанию:

  • HttpServerCodec: используется для разбора Http-запросов, в основном на этапе рукопожатия;
  • HttpObjectAggregator: используется для объединения заголовков Http-запросов и тел запросов, которые в основном обрабатываются на этапе рукопожатия;
  • WebSocketServerProtocolHandler: обрабатывает протокол Websocket;
  • CustomTextFrameHandler: пользовательский обработчик для добавления собственной бизнес-логики.

Не правда ли, очень удобно, после обработки WebSocketServerProtocolHandler текстовые данные считываются, так что вам не нужно заниматься проблемами перепаковки и распаковки данных самостоятельно.

CustomTextFrameHandler

Пользовательский обработчик для бизнес-обработки:

public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        final String content = frame.text();
        System.out.println("接收到数据:"+content);   
        
        // 回复数据
        TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的数据");
        if (ctx.channel().isWritable()) {
		      ChannelFuture future = ctx.writeAndFlush(respFrame);
		  }			        
    }
}

Преимущество Описание

Наконец, давайте поговорим о преимуществах: динамик Xiaoai F-код.

Я подготовил 2 копии, в основном, чтобы поблагодарить друзей «Официального аккаунта WeChat» и «Сообщества Nuggets».Каждая копия включает 1 динамик Xiaoai размера F и 1 динамик Xiaoai mini размера F.

Код F для мобильного телефона Xiaomi происходит от английского слова «Друг». Это право преимущественной покупки, предоставляемое Xiaomi основным пользователям Xiaomi и пользователям сети, которые внесли свой вклад в Xiaomi. Если у вас есть код Xiaomi F, вы можете использовать Xiaomi F Код напрямую без ожидания Покупайте сопутствующие товары!

Проще говоря, F-код означает, что вам не нужно его брать, вы можете купить его напрямую~

Крайний срок розыгрыша

9 апреля в 12.00

Правила розыгрыша
Сообщество самородков
  • Вы должны подписаться на мою учетную запись Nuggets, чтобы быть эффективными.Домашняя страница;

  • Используйте лотерейный помощник WeChat, чтобы случайным образом разыграть розыгрыш для сообщества Nuggets;

    掘金抽奖

Публичный аккаунт WeChat
  • Вам нужно обратить внимание на мой публичный аккаунт WeChat, чтобы быть эффективным;

    情情说

  • Используйте помощника по лотерее WeChat, чтобы случайным образом выбрать общедоступную учетную запись WeChat;

    微信公众号抽奖