Через netty в предыдущей серии исходного кодатрилогия реакторной нити, мы уже знаем, что поток-реактор netty подобен двигателю, управляющему работой всего фреймворка netty, ипривязка на стороне сервераа такжеустановление нового соединенияЭто предохранитель двигателя, который зажигает двигатель
Netty установит соответствующий канал в процессе привязки порта сервера и установления нового соединения, а концепция конвейера тесно связана с действием канала.Конвейер можно рассматривать как конвейер, исходный исходный материал (поток байтов). ) вводится, обрабатывается и, наконец, выводится
В этой статье я будуустановление нового соединенияНапример, он разделен на следующие части, чтобы представить вам, как работает конвейер в netty.
- инициализация конвейера
- конвейер добавить узел
- узел удаления конвейера
инициализация конвейера
существуетустановление нового соединенияВ этой статье мы увидели, как создатьNioSocketChannel
Когда создаются основные компоненты netty
конвейер является одним из них и создается в следующем коде
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Ссылка на канал сохраняется в пайплайне, после создания пайплайна весь пайплайн выглядит так
Каждый узел конвейера представляет собойChannelHandlerContext
объект, каждый узел контекста содержит исполнителя, который он обертываетChannelHandler
Контекст, необходимый для выполнения операции, на самом деле является конвейером, поскольку конвейер содержит ссылку на канал и может получить всю информацию о контексте.
По умолчанию конвейер будет иметь два узла, головной и хвостовой.В следующей статье мы подробно разберем эти два специальных узла.Сегодня мы сосредоточимся на конвейере.
конвейер добавить узел
Ниже приведен очень распространенный клиентский код.
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
Сначала используйте сплиттер для распаковки исходного TCP-пакета данных, затем декодируйте распакованный пакет и передайте его в бизнес-процессор BusinessHandler.После обработки бизнес-кодировщик выводит кодировщик.
Вся структура трубопровода выглядит следующим образом
Я использовал два цвета, чтобы различать два разных типа узлов в конвейере.ChannelInboundHandler
, для обработки событий inBound наиболее типичным является чтение потока данных и его обработка; другим типом обработчика являетсяChannelOutboundHandler
, обрабатывать исходящие события, например, при вызовеwriteAndFlush()
Когда метод класса, он будет проходить через этот тип обработчика
Независимо от типа обработчика, его внешний объектChannelHandlerContext
связаны двусвязным списком, и различают одинChannelHandlerContext
Независимо от того, включен он или нет, мы можем видеть, как netty обрабатывает его при добавлении узлов.
DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.检查是否有重复handler
checkMultiplicity(handler);
// 2.创建节点
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加节点
addLast0(newCtx);
}
// 4.回调用户方法
callHandlerAdded0(handler);
return this;
}
Здесь просто используйтеsynchronized
Метод заключается в предотвращении многопоточной параллельной работы базового двусвязного списка конвейера.
Давайте проанализируем приведенный выше код шаг за шагом
1. Проверьте, нет ли повторяющихся обработчиков
При добавлении обработчика в пользовательский код сначала проверяется, был ли добавлен обработчик
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
netty использует переменную-членadded
Определите, был ли добавлен канал. Приведенный выше код очень прост. Если обработчик, который в настоящее время добавляется, не является общим и уже добавлен, будет выдано исключение, в противном случае будет указано, что обработчик был добавлен.
Видно, что если обработчик является общим, его можно добавлять в конвейер на неопределенный срок.Если наш клиентский код хочет поделиться обработчиком, нам нужно только добавить аннотацию @Sharable, как показано ниже.
@Sharable
public class BusinessHandler {
}
И если обработчик является общим, он обычно используется с помощью Spring Injection, и нет необходимости каждый раз создавать новый.
isSharable()
Метод реализуется тем, помечен ли класс, соответствующий обработчику, @Sharable
ChannelHandlerAdapter
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
Здесь также видно, что для максимальной оптимизации производительности netty также использует ThreadLocal для кэширования состояния обработчика.При высокой степени параллелизма и массовых подключениях этот метод будет создаваться и вызываться каждый раз, когда к Обработчик.
2. Создайте узел
Вернитесь к основному процессу и посмотрите код для создания контекста.
newCtx = newContext(group, filterName(name, handler), handler);
Здесь нам нужно проанализироватьfilterName(name, handler)
Этот код, эта функция используется для создания уникального имени обработчика
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
Очевидно, что имя, которое мы передаем, равно null, и netty сгенерирует для нас имя по умолчанию. В противном случае проверьте, есть ли повторяющееся имя, и вернитесь, если проверка пройдена.
Правила для netty для создания имени по умолчанию:简单类名#0
, посмотрим как это реализовано
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
// 先查看缓存中是否有生成过默认name
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
// 没有生成过,就生成一个默认name,加入缓存
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
// 生成完了,还要看默认name有没有冲突
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1);
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
нетти используетFastThreadLocal
(В следующей статье будут подробно описаны) переменные для кэширования отношения сопоставления между классом Handler и именем по умолчанию.При создании имени сначала проверьте, было ли имя по умолчанию создано в кеше (简单类名#0
), если не сгенерировано, вызовитеgenerateName0()
Создайте имя по умолчанию и добавьте его в кеш
Далее нужно проверить, не конфликтует ли имя с существующим именем, и вызватьcontext0()
, выяснить, есть ли соответствующий контекст в конвейере
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
context0()
Список методов перебирает каждыйChannelHandlerContext
, пока имя контекста оказывается таким же, как имя, которое нужно добавить, контекст возвращается, и в конце выдается исключение.Как вы можете видеть, это на самом деле линейный процесс поиска
еслиcontext0(name) != null
устанавливается, указывая на то, что в существующем контексте уже есть имя по умолчанию, то из简单类名#1
Продолжайте искать, пока не найдете уникальное имя, например简单类名#3
Если код пользователя указывает имя при добавлении обработчика, единственное, что нужно сделать, это проверить наличие дубликатов.
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
После обработки имени вступает в процесс создания контекста, который известен из предыдущей цепочки вызовов,group
равно нулю, поэтомуchildExecutor(group)
также возвращает ноль
DefaultChannelPipeline
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
//..
}
DefaultChannelHandlerContext
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
В конструктореDefaultChannelHandlerContext
Верните параметры родительскому классу, сохраните ссылку Handler и введите его родительский класс.
AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
}
Это представлено двумя полями в nettychannelHandlerContext
принадлежатьinBound
все ещеoutBound
, или и то, и другое, два логических значения оцениваются следующими двумя небольшими функциями (см. код выше)
DefaultChannelHandlerContext
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
пройти черезinstanceof
Ключевое слово оценивается в соответствии с типом интерфейса, поэтому, если обработчик реализует два типа интерфейсов, это обработчик как входящего, так и исходящего типа, например, следующий класс
Обычно используемые кодеки, которые объединяют операции декодирования и операции кодирования, обычно наследуются.MessageToMessageCodec
,а такжеMessageToMessageCodec
это наследствоChannelDuplexHandler
MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
}
После создания контекста следующим шагом является окончательное добавление созданного контекста в конвейер.
3. Добавьте узлы
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}
На следующем рисунке показано простое представление этого процесса, Грубо говоря, это фактически операция вставки двусвязного списка.
После завершения операции контекст добавляется в конвейер.
На этом операция добавления узлов в пайплайн завершена, согласно этой идее можно освоить все методы серии addxxx().
4. Пользовательский метод обратного вызова
AbstractChannelHandlerContext
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
На четвертом шаге добавляется новый узел в конвейер, поэтому он начинает вызывать пользовательский код.ctx.handler().handlerAdded(ctx);
, общий код пользователя выглядит следующим образом
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 节点被添加完毕之后回调到此
// do something
}
}
Далее устанавливаем состояние узла
AbstractChannelHandlerContext
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
Используйте cas, чтобы изменить состояние узла на: REMOVE_COMPLETE (указывает, что узел был удален) или ADD_COMPLETE.
узел удаления конвейера
Одна из самых больших особенностей netty заключается в том, что обработчик является подключаемым, поэтому конвейер может быть динамически сплетен.Например, когда соединение устанавливается в первый раз, ему необходимо пройти аутентификацию авторизации.После прохождения аутентификации, контекст может быть удален Следующий конвейер Обработчик аутентификации авторизации не будет вызываться при распространении события
Ниже приведена простейшая реализация обработчика аутентификации полномочий.Первый пакет данных передает информацию об аутентификации.Если проверка пройдена, обработчик будет удален.В противном случае соединение будет закрыто напрямую.
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
}
Дело в томctx.pipeline().remove(this)
этот код
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
Операция удаления намного проще добавления и делится на три шага:
1. Найдите удаляемый узел 2. Настроить удаление указателя двусвязного списка 3. Функция обратного вызова пользователя
1. Найдите удаляемый узел
DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
Здесь, чтобы найти контекст, соответствующий обработчику, узел находится путем последовательного обхода двусвязного списка до тех пор, пока обработчик контекста не станет таким же, как текущий обработчик.
2. Настроить удаление указателя двусвязного списка
DefaultChannelPipeline
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.调整双向链表指针删除
remove0(ctx);
}
// 3.回调用户函数
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next; // 1
next.prev = prev; // 2
}
Процесс прохождения проще, чем добавление узлов, что можно представить следующей картинкой
Окончательный результат
Объединив эти две картинки, вы сможете ясно понять принцип работы обработчика проверки разрешений.Кроме того, удаленный узел будет автоматически переработан gc через некоторое время, поскольку на него нет объектной ссылки.
3. Функция обратного вызова пользователя
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
}
На третьем шаге узел в пайплайне удаляется, поэтому он начинает вызывать пользовательский кодctx.handler().handlerRemoved(ctx);
, общий код выглядит следующим образом
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 节点被删除完毕之后回调到此,可做一些资源清理
// do something
}
}
Наконец, установите состояние узла на удаленное
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
Другие семейства методов из серии removexxx похожи друг на друга.Вы можете расширить другие серии методов в соответствии с приведенными выше идеями, которые здесь повторяться не будут.
Суммировать
1. Ссоздание нового соединенияНапример, в процессе создания нового соединения создается канал, а в процессе создания канала создается пайплайн, соответствующий каналу.После создания пайплайна в пайплайн автоматически добавляются два узла, а именно ChannelHandlerContext , Есть полезные пайплайны и Вся контекстная информация о канале.
2. Конвейер представляет собой двустороннюю структуру связанного списка, добавление и удаление узлов необходимо только для настройки структуры связанного списка.
3. Каждый узел в конвейере оборачивает определенный процессорChannelHandler
, узел согласноChannelHandler
ТипChannelInboundHandler
все ещеChannelOutboundHandler
чтобы определить, принадлежит ли узел входу или выходу или обоим
В следующей статье продолжим разбор пайплайна, так что следите за обновлениями!
Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…