Конвейер анализа исходного кода Netty (1)

WeChat Spring исходный код Netty

Через netty в предыдущей серии исходного кодатрилогия реакторной нити, мы уже знаем, что поток-реактор netty подобен двигателю, управляющему работой всего фреймворка netty, ипривязка на стороне сервераа такжеустановление нового соединенияЭто предохранитель двигателя, который зажигает двигатель

Netty установит соответствующий канал в процессе привязки порта сервера и установления нового соединения, а концепция конвейера тесно связана с действием канала.Конвейер можно рассматривать как конвейер, исходный исходный материал (поток байтов). ) вводится, обрабатывается и, наконец, выводится

В этой статье я будуустановление нового соединенияНапример, он разделен на следующие части, чтобы представить вам, как работает конвейер в netty.

  • инициализация конвейера
  • конвейер добавить узел
  • узел удаления конвейера

инициализация конвейера

существуетустановление нового соединенияВ этой статье мы увидели, как создатьNioSocketChannelКогда создаются основные компоненты netty

channel中的核心组件

конвейер является одним из них и создается в следующем коде

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

AbstractChannel

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

Ссылка на канал сохраняется в пайплайне, после создания пайплайна весь пайплайн выглядит так

pipeline默认结构

Каждый узел конвейера представляет собойChannelHandlerContextобъект, каждый узел контекста содержит исполнителя, который он обертываетChannelHandlerКонтекст, необходимый для выполнения операции, на самом деле является конвейером, поскольку конвейер содержит ссылку на канал и может получить всю информацию о контексте.

По умолчанию конвейер будет иметь два узла, головной и хвостовой.В следующей статье мы подробно разберем эти два специальных узла.Сегодня мы сосредоточимся на конвейере.

конвейер добавить узел

Ниже приведен очень распространенный клиентский код.

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});

Сначала используйте сплиттер для распаковки исходного TCP-пакета данных, затем декодируйте распакованный пакет и передайте его в бизнес-процессор BusinessHandler.После обработки бизнес-кодировщик выводит кодировщик.

Вся структура трубопровода выглядит следующим образом

pipeline结构

Я использовал два цвета, чтобы различать два разных типа узлов в конвейере.ChannelInboundHandler, для обработки событий inBound наиболее типичным является чтение потока данных и его обработка; другим типом обработчика являетсяChannelOutboundHandler, обрабатывать исходящие события, например, при вызовеwriteAndFlush()Когда метод класса, он будет проходить через этот тип обработчика

Независимо от типа обработчика, его внешний объектChannelHandlerContextсвязаны двусвязным списком, и различают одинChannelHandlerContextНезависимо от того, включен он или нет, мы можем видеть, как netty обрабатывает его при добавлении узлов.

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.创建节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}

Здесь просто используйтеsynchronizedМетод заключается в предотвращении многопоточной параллельной работы базового двусвязного списка конвейера.

Давайте проанализируем приведенный выше код шаг за шагом

1. Проверьте, нет ли повторяющихся обработчиков

При добавлении обработчика в пользовательский код сначала проверяется, был ли добавлен обработчик

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

netty использует переменную-членaddedОпределите, был ли добавлен канал. Приведенный выше код очень прост. Если обработчик, который в настоящее время добавляется, не является общим и уже добавлен, будет выдано исключение, в противном случае будет указано, что обработчик был добавлен.

Видно, что если обработчик является общим, его можно добавлять в конвейер на неопределенный срок.Если наш клиентский код хочет поделиться обработчиком, нам нужно только добавить аннотацию @Sharable, как показано ниже.

@Sharable
public class BusinessHandler {
    
}

И если обработчик является общим, он обычно используется с помощью Spring Injection, и нет необходимости каждый раз создавать новый.

isSharable()Метод реализуется тем, помечен ли класс, соответствующий обработчику, @Sharable

ChannelHandlerAdapter

public boolean isSharable() {
   Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

Здесь также видно, что для максимальной оптимизации производительности netty также использует ThreadLocal для кэширования состояния обработчика.При высокой степени параллелизма и массовых подключениях этот метод будет создаваться и вызываться каждый раз, когда к Обработчик.

2. Создайте узел

Вернитесь к основному процессу и посмотрите код для создания контекста.

newCtx = newContext(group, filterName(name, handler), handler);

Здесь нам нужно проанализироватьfilterName(name, handler)Этот код, эта функция используется для создания уникального имени обработчика

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}

Очевидно, что имя, которое мы передаем, равно null, и netty сгенерирует для нас имя по умолчанию. В противном случае проверьте, есть ли повторяющееся имя, и вернитесь, если проверка пройдена.

Правила для netty для создания имени по умолчанию:简单类名#0, посмотрим как это реализовано

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // 先查看缓存中是否有生成过默认name
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    // 没有生成过,就生成一个默认name,加入缓存 
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // 生成完了,还要看默认name有没有冲突
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}

нетти используетFastThreadLocal(В следующей статье будут подробно описаны) переменные для кэширования отношения сопоставления между классом Handler и именем по умолчанию.При создании имени сначала проверьте, было ли имя по умолчанию создано в кеше (简单类名#0), если не сгенерировано, вызовитеgenerateName0()Создайте имя по умолчанию и добавьте его в кеш

Далее нужно проверить, не конфликтует ли имя с существующим именем, и вызватьcontext0(), выяснить, есть ли соответствующий контекст в конвейере

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}

context0()Список методов перебирает каждыйChannelHandlerContext, пока имя контекста оказывается таким же, как имя, которое нужно добавить, контекст возвращается, и в конце выдается исключение.Как вы можете видеть, это на самом деле линейный процесс поиска

еслиcontext0(name) != nullустанавливается, указывая на то, что в существующем контексте уже есть имя по умолчанию, то из简单类名#1Продолжайте искать, пока не найдете уникальное имя, например简单类名#3

Если код пользователя указывает имя при добавлении обработчика, единственное, что нужно сделать, это проверить наличие дубликатов.

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

После обработки имени вступает в процесс создания контекста, который известен из предыдущей цепочки вызовов,groupравно нулю, поэтомуchildExecutor(group)также возвращает ноль

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

В конструктореDefaultChannelHandlerContextВерните параметры родительскому классу, сохраните ссылку Handler и введите его родительский класс.

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}

Это представлено двумя полями в nettychannelHandlerContextпринадлежатьinBoundвсе ещеoutBound, или и то, и другое, два логических значения оцениваются следующими двумя небольшими функциями (см. код выше)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

пройти черезinstanceofКлючевое слово оценивается в соответствии с типом интерфейса, поэтому, если обработчик реализует два типа интерфейсов, это обработчик как входящего, так и исходящего типа, например, следующий класс

ChannelDuplexHandler

Обычно используемые кодеки, которые объединяют операции декодирования и операции кодирования, обычно наследуются.MessageToMessageCodec,а такжеMessageToMessageCodecэто наследствоChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;
 }

После создания контекста следующим шагом является окончательное добавление созданного контекста в конвейер.

3. Добавьте узлы

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}

На следующем рисунке показано простое представление этого процесса, Грубо говоря, это фактически операция вставки двусвязного списка.

添加节点过程

После завершения операции контекст добавляется в конвейер.

添加节点之后

На этом операция добавления узлов в пайплайн завершена, согласно этой идее можно освоить все методы серии addxxx().

4. Пользовательский метод обратного вызова

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

На четвертом шаге добавляется новый узел в конвейер, поэтому он начинает вызывать пользовательский код.ctx.handler().handlerAdded(ctx);, общий код пользователя выглядит следующим образом

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 节点被添加完毕之后回调到此
        // do something
    }
}

Далее устанавливаем состояние узла

AbstractChannelHandlerContext

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}

Используйте cas, чтобы изменить состояние узла на: REMOVE_COMPLETE (указывает, что узел был удален) или ADD_COMPLETE.

узел удаления конвейера

Одна из самых больших особенностей netty заключается в том, что обработчик является подключаемым, поэтому конвейер может быть динамически сплетен.Например, когда соединение устанавливается в первый раз, ему необходимо пройти аутентификацию авторизации.После прохождения аутентификации, контекст может быть удален Следующий конвейер Обработчик аутентификации авторизации не будет вызываться при распространении события

Ниже приведена простейшая реализация обработчика аутентификации полномочий.Первый пакет данных передает информацию об аутентификации.Если проверка пройдена, обработчик будет удален.В противном случае соединение будет закрыто напрямую.

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}

Дело в томctx.pipeline().remove(this)этот код

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

Операция удаления намного проще добавления и делится на три шага:

1. Найдите удаляемый узел 2. Настроить удаление указателя двусвязного списка 3. Функция обратного вызова пользователя

1. Найдите удаляемый узел

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

Здесь, чтобы найти контекст, соответствующий обработчику, узел находится путем последовательного обхода двусвязного списка до тех пор, пока обработчик контекста не станет таким же, как текущий обработчик.

2. Настроить удаление указателя двусвязного списка

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.调整双向链表指针删除
        remove0(ctx);
    }
    // 3.回调用户函数
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

Процесс прохождения проще, чем добавление узлов, что можно представить следующей картинкой

删除节点过程

Окончательный результат

删除节点之后

Объединив эти две картинки, вы сможете ясно понять принцип работы обработчика проверки разрешений.Кроме того, удаленный узел будет автоматически переработан gc через некоторое время, поскольку на него нет объектной ссылки.

3. Функция обратного вызова пользователя

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}

На третьем шаге узел в пайплайне удаляется, поэтому он начинает вызывать пользовательский кодctx.handler().handlerRemoved(ctx);, общий код выглядит следующим образом

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 节点被删除完毕之后回调到此,可做一些资源清理
        // do something
    }
}

Наконец, установите состояние узла на удаленное

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}

Другие семейства методов из серии removexxx похожи друг на друга.Вы можете расширить другие серии методов в соответствии с приведенными выше идеями, которые здесь повторяться не будут.

Суммировать

1. Ссоздание нового соединенияНапример, в процессе создания нового соединения создается канал, а в процессе создания канала создается пайплайн, соответствующий каналу.После создания пайплайна в пайплайн автоматически добавляются два узла, а именно ChannelHandlerContext , Есть полезные пайплайны и Вся контекстная информация о канале.

2. Конвейер представляет собой двустороннюю структуру связанного списка, добавление и удаление узлов необходимо только для настройки структуры связанного списка.

3. Каждый узел в конвейере оборачивает определенный процессорChannelHandler, узел согласноChannelHandlerТипChannelInboundHandlerвсе ещеChannelOutboundHandlerчтобы определить, принадлежит ли узел входу или выходу или обоим

В следующей статье продолжим разбор пайплайна, так что следите за обновлениями!

Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…