Серия Netty: событие, обработчик и конвейер

Java задняя часть Netty

Это 6-й день моего участия в августовском испытании обновлений, подробности о событии:Испытание августовского обновления

Введение

В предыдущем разделе мы объяснили канал в netty, зная, что канал — это мост между обработчиком событий и внешней связью. Сегодня эта статья подробно объяснит оставшиеся несколько очень важных частей netty, Event, Handler и PipeLine.

ChannelPipeline

PipeLine — это мост, соединяющий Channel и обработчик, фактически реализация фильтра, который используется для управления методом обработки обработчика.

При создании канала также создается соответствующий канал ChannelPipeline.

Сначала взгляните на определение ChannelPipeline:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

Во-первых, ChannelPipeline наследуется от Iterable, а это значит, что он проходим, а результатом обхода является один из обработчиков.

Как квалифицированный объект Iterable, ChannelPipeline предоставляет ряд методов добавления и удаления, с помощью которых обработчики могут быть добавлены или удалены из ChannelPipeline. Поскольку ChannelPipeline — это фильтр, а фильтр должен указывать порядок соответствующего фильтра, существуют методы addFirst и addLast для добавления разных порядков в ChannelPipeline.

Тогда вы можете увидеть, что ChannelPipine наследует два интерфейса ChannelInBoundinvoker и ChannelOutBoundinvoker.

Сначала взгляните на рабочую блок-схему канала ChannelPipeline:

Видно, что ChannelPipeline в основном имеет две операции: одна предназначена для чтения входящего трафика, а другая — для записи исходящего.

Для операций чтения, таких как Socket.read(), фактически вызывается метод в ChannelInboundInvoker. Для внешних запросов записи ввода-вывода вызывается метод в ChannelOutboundInvoker.

Обратите внимание, что порядок обработки входящих и исходящих запросов обратный, как в следующем примере:

    ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

В приведенном выше коде мы добавили 5 обработчиков в ChannelPipeline, включая 2 InboundHandler, 2 OutboundHandler и обработчик, который обрабатывает как вход, так и выход.

Затем, когда канал встречает входящее событие, оно будет обрабатываться в порядке 1, 2, 3, 4 и 5, но только InboundHandler может обработать входящее событие, поэтому реальный порядок выполнения — 1, 2 и 5.

Точно так же, когда канал встречает исходящее событие, оно будет выполняться в порядке 5, 4, 3, 2, 1, но только outboundHandler может обработать исходящее событие, поэтому реальный порядок выполнения — 5, 4, 3. .

Проще говоря, ChannelPipeline определяет порядок выполнения Handler.

ChannelHandler

Netty — это платформа, управляемая событиями, и все события обрабатываются Handler. ChannelHandler может обрабатывать ввод-вывод, перехватывать ввод-вывод или передавать событие следующему обработчику в ChannelPipeline для обработки.

Структура ChannelHandler очень проста, всего три метода, а именно:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

В соответствии с разницей между входящими и исходящими событиями, ChannelHandler можно разделить на две категории, а именно ChannelInboundHandler и ChannelOutboundHandler.

Поскольку эти два интерфейса являются интерфейсами, реализовать их сложнее, поэтому netty предоставляет вам три реализации по умолчанию: ChannelInboundHandlerAdapter, ChannelOutboundHandlerAdapter и ChannelDuplexHandler. Первые два хорошо понятны, они входящие и исходящие, а последний может обрабатывать как входящие, так и исходящие.

ChannelHandler предоставляется ChannelHandlerContext, а взаимодействие с ChannelPipeline также осуществляется через ChannelHandlerContext.

ChannelHandlerContext

ChannelHandlerContext позволяет ChannelHandler взаимодействовать с ChannelPipeline или другими обработчиками. Это контекст, который позволяет обработчику и каналу взаимодействовать.

Например, в ChannelHandlerContext вызовите функцию channel(), чтобы получить связанный канал. Связанный Handler можно получить, вызвав handler(). События канала запускаются вызовом методов fire*.

Взгляните на определение ChannelHandlerContext:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

Вы можете видеть, что это AttributeMap, используемый для хранения атрибутов, или ChannelInboundInvoker и ChannelOutboundInvoker, используемые для запуска и распространения соответствующих событий.

Для Inbound методы распространения событий следующие:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Для Outbound методы распространения событий следующие:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

Эти методы, вызываемые в одном обработчике, затем передают событие следующему обработчику следующим образом:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }

Переменные состояния в ChannelHandler

ChannelHandler — это класс Handler.В общем случае экземпляр этого класса может использоваться несколькими каналами при условии, что ChannelHandler не имеет общих переменных состояния.

Но иногда нам нужно поддерживать состояние в ChannelHandler, тогда это связано с проблемой переменных состояния в ChannelHandler, см. следующий пример:

  public interface Message {
       // your methods here
   }
  
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
  
       private boolean loggedIn;
  
        @Override
       public void channelRead0(ChannelHandlerContext ctx, Message message) {
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) message);
               loggedIn = true;
           } else (message instanceof GetDataMessage) {
               if (loggedIn) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
               } else {
                   fail();
               }
           }
       }
       ...
   }

В этом примере нам нужно аутентифицировать сообщение и сохранить состояние аутентификации после получения LoginMessage, Поскольку бизнес-логика такая, должна быть переменная состояния.

Тогда обработчик с переменной состояния может привязываться только к одному каналу.Если привязаны несколько каналов, может возникнуть проблема несогласованности состояния. Канал привязан к экземпляру Handler, это очень просто, просто используйте ключевое слово new в методе initChannel для создания нового объекта.

   public class DataServerInitializer extends ChannelInitializer<Channel> {
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", new DataServerHandler());
       }
   }

Итак, есть ли другой способ, кроме создания нового экземпляра обработчика? Конечно, это свойство AttributeKey в ChannelHandlerContext. В приведенном выше примере давайте посмотрим, как использовать AttributeKey:

   public interface Message {
       // your methods here
   }
  
    @Sharable
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private final AttributeKey<Boolean> auth =
             AttributeKey.valueOf("auth");
  
        @Override
       public void channelRead(ChannelHandlerContext ctx, Message message) {
           Attribute<Boolean> attr = ctx.attr(auth);
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) o);
               attr.set(true);
           } else (message instanceof GetDataMessage) {
               if (Boolean.TRUE.equals(attr.get())) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
               } else {
                   fail();
               }
           }
       }
       ...
   }

В приведенном выше примере сначала определяется AttributeKey, а затем метод attr ChannelHandlerContext используется для установки атрибута в ChannelHandlerContext, чтобы атрибут был привязан к ChannelHandlerContext. Последующие свойства отличаются, даже если один и тот же обработчик используется в разных каналах.

Ниже приведен пример использования общего обработчика:

   public class DataServerInitializer extends ChannelInitializer<Channel> {
  
       private static final DataServerHandler SHARED = new DataServerHandler();
  
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", SHARED);
       }
   }

Обратите внимание, что при определении DataServerHandler мы добавили аннотацию @Sharable.Если ChannelHandler использует аннотацию @Sharable, это означает, что вы можете создать этот Handler только один раз, но вы можете привязать его к одному или нескольким ChannelPipelines.

Обратите внимание, что аннотация @Sharable подготовлена ​​для java-документа и не повлияет на фактический эффект выполнения кода.

Асинхронный обработчик

Как упоминалось ранее, обработчики можно добавить в конвейер, вызвав метод pipe.addLast.Поскольку конвейер представляет собой структуру фильтра, добавленные обработчики обрабатываются последовательно.

Однако что, если я хочу, чтобы некоторые обработчики выполнялись в новом потоке? Что, если мы хотим, чтобы обработчики, выполняемые в этих новых потоках, работали не по порядку?

Например, теперь у нас есть три обработчика MyHandler1, MyHandler2 и MyHandler3.

Последовательное выполнение записывается так:

ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast("MyHandler3", new MyHandler3());

Если вы хотите, чтобы MyHandler3 выполнялся в новом потоке, вы можете добавить параметр группы, чтобы обработчик запускался в новой группе:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast(group,"MyHandler3", new MyHandler3());

Однако обработчики, добавленные DefaultEventExecutorGroup в приведенном выше примере, также будут выполняться последовательно.Если вы действительно не хотите выполнять последовательно, вы можете попробовать рассмотреть возможность использования UnorderedThreadPoolEventExecutor.

Суммировать

В этой статье объясняются Event, Handler и PipeLine, а также приводятся примеры отношений и взаимодействия между ними. Продолжение начнется с конкретной практики netty, чтобы еще больше углубить понимание и применение netty.Надеюсь, всем понравится.

Эта статья была включена вwoohoo. Флойд press.com/05-Netty-eat…

Самая популярная интерпретация, самая глубокая галантерея, самые краткие уроки и множество трюков, о которых вы не знаете, ждут вас!

Добро пожаловать, чтобы обратить внимание на мой официальный аккаунт: «Программируйте эти вещи», разбирайтесь в технологиях, лучше поймите себя!