Это 6-й день моего участия в августовском испытании обновлений, подробности о событии:Испытание августовского обновления
Введение
В предыдущем разделе мы объяснили канал в netty, зная, что канал — это мост между обработчиком событий и внешней связью. Сегодня эта статья подробно объяснит оставшиеся несколько очень важных частей netty, Event, Handler и PipeLine.
ChannelPipeline
PipeLine — это мост, соединяющий Channel и обработчик, фактически реализация фильтра, который используется для управления методом обработки обработчика.
При создании канала также создается соответствующий канал ChannelPipeline.
Сначала взгляните на определение ChannelPipeline:
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable
Во-первых, ChannelPipeline наследуется от Iterable, а это значит, что он проходим, а результатом обхода является один из обработчиков.
Как квалифицированный объект Iterable, ChannelPipeline предоставляет ряд методов добавления и удаления, с помощью которых обработчики могут быть добавлены или удалены из ChannelPipeline. Поскольку ChannelPipeline — это фильтр, а фильтр должен указывать порядок соответствующего фильтра, существуют методы addFirst и addLast для добавления разных порядков в ChannelPipeline.
Тогда вы можете увидеть, что ChannelPipine наследует два интерфейса ChannelInBoundinvoker и ChannelOutBoundinvoker.
Сначала взгляните на рабочую блок-схему канала ChannelPipeline:
Видно, что ChannelPipeline в основном имеет две операции: одна предназначена для чтения входящего трафика, а другая — для записи исходящего.
Для операций чтения, таких как Socket.read(), фактически вызывается метод в ChannelInboundInvoker. Для внешних запросов записи ввода-вывода вызывается метод в ChannelOutboundInvoker.
Обратите внимание, что порядок обработки входящих и исходящих запросов обратный, как в следующем примере:
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
В приведенном выше коде мы добавили 5 обработчиков в ChannelPipeline, включая 2 InboundHandler, 2 OutboundHandler и обработчик, который обрабатывает как вход, так и выход.
Затем, когда канал встречает входящее событие, оно будет обрабатываться в порядке 1, 2, 3, 4 и 5, но только InboundHandler может обработать входящее событие, поэтому реальный порядок выполнения — 1, 2 и 5.
Точно так же, когда канал встречает исходящее событие, оно будет выполняться в порядке 5, 4, 3, 2, 1, но только outboundHandler может обработать исходящее событие, поэтому реальный порядок выполнения — 5, 4, 3. .
Проще говоря, ChannelPipeline определяет порядок выполнения Handler.
ChannelHandler
Netty — это платформа, управляемая событиями, и все события обрабатываются Handler. ChannelHandler может обрабатывать ввод-вывод, перехватывать ввод-вывод или передавать событие следующему обработчику в ChannelPipeline для обработки.
Структура ChannelHandler очень проста, всего три метода, а именно:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
В соответствии с разницей между входящими и исходящими событиями, ChannelHandler можно разделить на две категории, а именно ChannelInboundHandler и ChannelOutboundHandler.
Поскольку эти два интерфейса являются интерфейсами, реализовать их сложнее, поэтому netty предоставляет вам три реализации по умолчанию: ChannelInboundHandlerAdapter, ChannelOutboundHandlerAdapter и ChannelDuplexHandler. Первые два хорошо понятны, они входящие и исходящие, а последний может обрабатывать как входящие, так и исходящие.
ChannelHandler предоставляется ChannelHandlerContext, а взаимодействие с ChannelPipeline также осуществляется через ChannelHandlerContext.
ChannelHandlerContext
ChannelHandlerContext позволяет ChannelHandler взаимодействовать с ChannelPipeline или другими обработчиками. Это контекст, который позволяет обработчику и каналу взаимодействовать.
Например, в ChannelHandlerContext вызовите функцию channel(), чтобы получить связанный канал. Связанный Handler можно получить, вызвав handler(). События канала запускаются вызовом методов fire*.
Взгляните на определение ChannelHandlerContext:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
Вы можете видеть, что это AttributeMap, используемый для хранения атрибутов, или ChannelInboundInvoker и ChannelOutboundInvoker, используемые для запуска и распространения соответствующих событий.
Для Inbound методы распространения событий следующие:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Для Outbound методы распространения событий следующие:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
Эти методы, вызываемые в одном обработчике, затем передают событие следующему обработчику следующим образом:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
Переменные состояния в ChannelHandler
ChannelHandler — это класс Handler.В общем случае экземпляр этого класса может использоваться несколькими каналами при условии, что ChannelHandler не имеет общих переменных состояния.
Но иногда нам нужно поддерживать состояние в ChannelHandler, тогда это связано с проблемой переменных состояния в ChannelHandler, см. следующий пример:
public interface Message {
// your methods here
}
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private boolean loggedIn;
@Override
public void channelRead0(ChannelHandlerContext ctx, Message message) {
if (message instanceof LoginMessage) {
authenticate((LoginMessage) message);
loggedIn = true;
} else (message instanceof GetDataMessage) {
if (loggedIn) {
ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
} else {
fail();
}
}
}
...
}
В этом примере нам нужно аутентифицировать сообщение и сохранить состояние аутентификации после получения LoginMessage, Поскольку бизнес-логика такая, должна быть переменная состояния.
Тогда обработчик с переменной состояния может привязываться только к одному каналу.Если привязаны несколько каналов, может возникнуть проблема несогласованности состояния. Канал привязан к экземпляру Handler, это очень просто, просто используйте ключевое слово new в методе initChannel для создания нового объекта.
public class DataServerInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", new DataServerHandler());
}
}
Итак, есть ли другой способ, кроме создания нового экземпляра обработчика? Конечно, это свойство AttributeKey в ChannelHandlerContext. В приведенном выше примере давайте посмотрим, как использовать AttributeKey:
public interface Message {
// your methods here
}
@Sharable
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private final AttributeKey<Boolean> auth =
AttributeKey.valueOf("auth");
@Override
public void channelRead(ChannelHandlerContext ctx, Message message) {
Attribute<Boolean> attr = ctx.attr(auth);
if (message instanceof LoginMessage) {
authenticate((LoginMessage) o);
attr.set(true);
} else (message instanceof GetDataMessage) {
if (Boolean.TRUE.equals(attr.get())) {
ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
} else {
fail();
}
}
}
...
}
В приведенном выше примере сначала определяется AttributeKey, а затем метод attr ChannelHandlerContext используется для установки атрибута в ChannelHandlerContext, чтобы атрибут был привязан к ChannelHandlerContext. Последующие свойства отличаются, даже если один и тот же обработчик используется в разных каналах.
Ниже приведен пример использования общего обработчика:
public class DataServerInitializer extends ChannelInitializer<Channel> {
private static final DataServerHandler SHARED = new DataServerHandler();
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", SHARED);
}
}
Обратите внимание, что при определении DataServerHandler мы добавили аннотацию @Sharable.Если ChannelHandler использует аннотацию @Sharable, это означает, что вы можете создать этот Handler только один раз, но вы можете привязать его к одному или нескольким ChannelPipelines.
Обратите внимание, что аннотация @Sharable подготовлена для java-документа и не повлияет на фактический эффект выполнения кода.
Асинхронный обработчик
Как упоминалось ранее, обработчики можно добавить в конвейер, вызвав метод pipe.addLast.Поскольку конвейер представляет собой структуру фильтра, добавленные обработчики обрабатываются последовательно.
Однако что, если я хочу, чтобы некоторые обработчики выполнялись в новом потоке? Что, если мы хотим, чтобы обработчики, выполняемые в этих новых потоках, работали не по порядку?
Например, теперь у нас есть три обработчика MyHandler1, MyHandler2 и MyHandler3.
Последовательное выполнение записывается так:
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("MyHandler1", new MyHandler1());
pipeline.addLast("MyHandler2", new MyHandler2());
pipeline.addLast("MyHandler3", new MyHandler3());
Если вы хотите, чтобы MyHandler3 выполнялся в новом потоке, вы можете добавить параметр группы, чтобы обработчик запускался в новой группе:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("MyHandler1", new MyHandler1());
pipeline.addLast("MyHandler2", new MyHandler2());
pipeline.addLast(group,"MyHandler3", new MyHandler3());
Однако обработчики, добавленные DefaultEventExecutorGroup в приведенном выше примере, также будут выполняться последовательно.Если вы действительно не хотите выполнять последовательно, вы можете попробовать рассмотреть возможность использования UnorderedThreadPoolEventExecutor.
Суммировать
В этой статье объясняются Event, Handler и PipeLine, а также приводятся примеры отношений и взаимодействия между ними. Продолжение начнется с конкретной практики netty, чтобы еще больше углубить понимание и применение netty.Надеюсь, всем понравится.
Эта статья была включена вwoohoo. Флойд press.com/05-Netty-eat…
Самая популярная интерпретация, самая глубокая галантерея, самые краткие уроки и множество трюков, о которых вы не знаете, ждут вас!
Добро пожаловать, чтобы обратить внимание на мой официальный аккаунт: «Программируйте эти вещи», разбирайтесь в технологиях, лучше поймите себя!