Полный анализ доступа к новому соединению анализа исходного кода netty

Java задняя часть исходный код Netty

Урожай этой статьи

Прочитайте эту статью, и вы узнаете

  1. Как netty принимает новые запросы
  2. Как netty назначает потоки реактора новым запросам
  3. Как netty добавляет ChannelHandler к каждому новому соединению

На самом деле, это намного больше, чем это

фон преамбулы

Прежде чем читать эту статью, лучше иметь некоторые знания о предварительном заказе, включая netty внить реактора,так же какПроцесс запуска сервераПозвольте мне дать вам краткий обзор

1. Резьба реактора в сетке

Основная вещь в netty — это два типа потоков-реакторов, которые можно рассматривать как два типа движков в netty, управляющих работой всего фреймворка netty.

Один тип потока-реактора — это группа потоков Boost, которая специально используется для приема новых соединений, а затем инкапсулирует их в объект канала и передает в группу рабочих потоков; другой тип потока-реактор — это группа рабочих потоков, которая специально используется для обработки соединения для чтения и записи.

Будь то поток Boos или рабочий поток, то, что он делает, делится на следующие три шага.

  1. Опрос событий ввода-вывода, зарегистрированных в селекторе
  2. Обработка событий ввода-вывода
  3. Выполнение асинхронных задач

Для потока boos первый опрашиваемый шаг — это в основном события принятия, указывающие на наличие нового соединения, в то время как опрашиваемые рабочие потоки в основном представляют собой события чтения/записи, указывающие на события чтения и записи в сети.

2. Запуск сервера

Процесс запуска сервера запускается в пользовательском потоке, первый разЗапустить поток ускорения при добавлении асинхронной задачиПри запуске netty инкапсулирует процесс обработки новых соединений в канал, а соответствующий пайплайн обрабатывает вновь установленные соединения по порядку (подробный разбор пайплайнов я начну позже)

Разобравшись с двумя предысториями, мы начинаем переходить к сути.

установление нового соединения

Проще говоря, установление нового соединения можно разделить на три шага.

  1. Обнаружено новое соединение
  2. Зарегистрируйте новое подключение к группе рабочих потоков.
  3. Зарегистрируйтесь для чтения событий для новых подключений

Давайте отвезем вас в Паодинцзе Ню и шаг за шагом проанализируем весь процесс.

Обнаружено новое соединение

Мы уже знаем, что, когда сервер привязан и запущен, канал сервера был зарегистрирован в потоке boost реактора, и реактор продолжает обнаруживать новые события, пока не обнаружит, что происходит событие accept.

NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

Приведенный выше кодВторая в трилогии реакторных нитей, указывая на то, что поток реактора boos был опрошен дляSelectionKey.OP_ACCEPTСобытие, указывающее на установление нового соединения, вызоветunsafeвыполнить реальную операцию

оunsafe, я не буду вдаваться в подробности в этой статье, ниже приводится объяснение автора netty о небезопасных

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport.

Нужно только понимать общую концепцию, то есть все каналы будут иметь привязку с unsafe внизу, а реальную работу каждого типа канала реализует unsafe

А из предыдущей статьиПроцесс запуска сервера, мы уже знаем, что небезопасность канала, соответствующего серверу,NioMessageUnsafe, то входим в егоreadметод, введите второй шаг обработки нового соединения

Зарегистрируйтесь в потоке реактора

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}

Я опустил код некритичной части, как видите, как только я придумал, я использовал утверждение, чтобы определить, что метод чтения должен быть вызван потоком-реактором, а затем я получил конвейер и соответствующий канал.RecvByteBufAllocator.Handle(не объяснять сначала)

Далее звонитеdoReadMessagesметод непрерывно читает сообщение, используяreadBufВ качестве контейнера здесь вы можете догадаться, что соединения считываются одно за другим, а затем вызыватьpipeline.fireChannelRead(), пропускать каждое новое соединение через слой серверного канала крещения

После очистки контейнера включитеpipeline.fireChannelReadComplete(), весь процесс четкий и понятный, без следов примесей, давайте подробно рассмотрим эти два способа

1.doReadMessages(List) 2.pipeline.fireChannelRead(NioSocketChannel)

1.doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

Наконец-то мы наблюдаем за границей, где netty вызывает лежащий в основе nio jdk.javaChannel().accept();, потому что поток-реактор в netty сканирует событие accept на первом шаге, так что вотacceptМетод немедленно возвращается, возвращая канал, созданный базовым nio jdk.

нетти будет jdkSocketChannelупаковано по индивидуальному заказуNioSocketChannel, добавленный в список, чтобы внешний слой мог пройти по списку и выполнить последующую обработку

отПредыдущая статья, мы уже знаем, что ряд основных компонентов в netty будет создан во время создания сервера, в том числе конвейер, небезопасный и т. д. Итак, будет ли создаваться этот ряд компонентов при принятии нового соединения?

Имея в виду этот вопрос, мы продолжаем

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

Наш ключевой анализsuper(parent, socket), анализ, связанный с конфигурацией, мы помещаем в следующую статью

NioSocketChannelРодительский классAbstractNioByteChannel

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

Здесь мы видим знакомую тень в jdk nio——SelectionKey.OP_READ, вообще в нативном программировании jdk nio такое событие тоже будет зарегистрировано, указывая на то, что вы заинтересованы в чтении канала

Мы продолжаем идти вверх и прослеживаем доAbstractNioByteChannelродительский классAbstractNioChannel, вот, я думаю прочиталПредыдущая статьяУ вас должно быть представление об этой части кода

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

Когда вы создаете канал на стороне сервера, вы в конечном итоге войдете в этот метод.super(parent), вAbstractChannelСоздайте серию компонентов, привязанных к каналу, как показано ниже.

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

А вотreadInterestOpУказывает, что событие, о котором заботится канал,SelectionKey.OP_READ, событие будет зарегистрировано в селекторе позже, и тогда канал будет установлен в неблокирующий режим

На этом этапе я наконец-то могу показать вам структурную диаграмму наиболее часто используемых каналов в netty.

简化版channel继承关系

Отношения наследования здесь упрощены, в настоящее время нам нужно только понять.

первый

  1. канал наследует Comparable означает, что канал является сопоставимым объектом
  2. Канал наследует AttributeMap, указывая, что канал является объектом, который может связывать атрибуты.В пользовательском коде мы часто используем из него метод channel.attr(...).
  3. ChannelOutboundInvoker — это новая абстракция, добавленная в версии 4.1.x, представляющая операции, которые может выполнять канал.
  4. DefaultAttributeMap используется для метода по умолчанию абстракции AttributeMap, и последний канал наследует прямое использование
  5. AbstractChannel используется для реализации большинства методов канала, из которых нам наиболее знакомы базовые компоненты, создающие канал в его конструкторе.
  6. AbstractNioChannel выполняет некоторые операции, связанные с nio, на основе AbstractChannel и сохраняет базовый jdk.SelectableChannel, и установите неблокирующий канал в конструкторе
  7. Наконец, есть два канала, NioServerSocketChannel и NioSocketChannel, которые соответствуют процессу принятия новых соединений сервером и процессу чтения и записи новых соединений.

Прочитав это, вы в основном поняли более половины общей структуры канала.

Хорошо, давайте выйдем из стека, продолжим предыдущий анализ исходного кода и создадимNioSocketChannelПосле этого, поместив его в контейнер List, приступайте к следующему шагу

2.pipeline.fireChannelRead(NioSocketChannel)

AbstractNioMessageChannel.java

pipeline.fireChannelRead(NioSocketChannel);

Прежде чем я официально представлю конвейер, позвольте мне кратко представить компонент конвейера.

В различных типах каналов netty будет конвейер, что буквально означает конвейер.Мы можем понимать его как процесс конвейера.Процесс конвейера имеет начальную точку, конец и различные уровни конвейера в середине.Элемент , начать обработку в начале конвейера, пройти обработку каждого уровня конвейера и, наконец, завершить конвейер

Соответственно netty начало конвейераHeadContxt, конец конвейераTailConext,HeadContxtвызыватьUnsafeвыполнять определенные операции,TailConextОн используется для создания необработанных исключений в конвейере и предупреждения пользователя о необработанных сообщениях.Мы подробно обсудим конкретный анализ конвейера позже.

пройти черезпредыдущая статья, мы уже знаем, что в пайплайне, где сервер обрабатывает новые соединения, автоматически добавился обработчик пайплайнаServerBootstrapAcceptor, а в конструктор был передан ряд параметров, заданных в пользовательском коде.ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;

    ServerBootstrapAcceptor(
            EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

фронтpipeline.fireChannelRead(NioSocketChannel);Наконец, через цепочку вызовов head->unsafe->ServerBootstrapAcceptor вызовите здесьServerBootstrapAcceptorизchannelReadметод

а такжеchannelReadКак только вы подойдете, принудительно преобразуйте сообщение здесь вChannel, почему здесь возможно принуждение? Читатели могут подумать о

Затем получите канал, который мы использовали раньше.NioSocketChannelСоответствующий конвейер, пользовательский код вchildHandler, добавлено в конвейер, здесьchildHandlerЭто отражается в пользовательском коде как

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoServerHandler());
     }
 });

На самом деле он соответствуетChannelInitializer, иди туда,NioSocketChannelПроцессор, соответствующий конвейеру, это head->ChannelInitializer->tail, имейте в виду, что он будет упомянут позже!

Далее установитеNioSocketChannelСоответствующий атрибут и параметр, а затем введитеchildGroup.register(child), здесь childGroup — это то, что мы добавили в стартовом кодеNioEventLoopGroup, вы можете обратиться кэта статья

мы вступаем вNioEventLoopGroupизregisterметод, проксируемый его родительскому классуMultithreadEventLoopGroup

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

Вот еще один метод next(), давайте продолжим

MultithreadEventLoopGroup.java

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

вернуться к своему родительскому классу

MultithreadEventExecutorGroup.java

@Override
public EventExecutor next() {
    return chooser.next();
}

Класс, соответствующий селектору, здесьEventExecutorChooser, буквально означает селектор исполнителя событий, роль его размещения в нашем контексте заключается в выборе потока реактора из группы потоков рабочего реактора.

public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}

Я не планирую расширять конкретное создание селектора, полагаю, что навыки чтения исходников в предыдущих статьях могут помочь вам узнать начало и конец селектора, здесь скажу прямо (но советую проанализировать его самостоятельно, это очень просто), Chooser Есть две реализации

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

По умолчанию селектор проходитDefaultEventExecutorChooserFactoryСоздано. При создании селектора потока реактора будет оцениваться количество потоков реактора. Если это степень 2, он будет создан.PowerOfTowEventExecutorChooser, иначе создатьGenericEventExecutorChooser

При выборе потока реактора два типа селекторов выбирают поток реактора с помощью метода циклического перебора. Единственное отличие состоит в том, чтоPowerOfTowEventExecutorChooserвыполняется операцией AND, иGenericEventExecutorChooserИменно через операцию остатка, и эффективность операции выше, чем у операции остатка.Видно, что netty просто без ума от оптимизации эффективности!

После выбора потока реактора, т.е.NioEventLoopПосле этого возвращаемся туда, где мы зарегистрировались

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

прокси дляNioEventLoopродительского классаregisterметод

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

На самом деле этот процесс уже запущен как сервер, вы можете обратиться к подробным шагамПодробное объяснение запуска сервераВ этой статье мы сразу переходим к ключевой ссылке

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

Как и процесс запуска сервера, первый вызовdoRegister();Выполните реальный процесс регистрации следующим образом.

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

привязать канал кselectorUp, селектор используется потоком-реактором, и последующий опрос событий канала, а также обработка событий, асинхронное выполнение задачи — все это отвечает за этот поток-реактор.

После привязки потока реактора вызовитеpipeline.invokeHandlerAddedIfNeeded()

Как мы уже говорили, покаNioSocketChannelВ конвейере есть три процессора, head->ChannelInitializer->tail, которые в конечном итоге будут вызываться дляChannelInitializerизhandlerAddedметод

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

handlerAddedвызов методаinitChannelметод, вызовremove(ctx);удалить себя

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

А вотinitChannelМетод еще один фокус? Вернемся к пользовательскому методу, такому как следующий пользовательский код

Код пользователя

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });

О, это оказалось в нашем собственном коде! Я не буду объяснять, что делает этот код, знаете ли~

После завершения,NioSocketChannelОбработчики связанного конвейера включают head->LoggingHandler->EchoServerHandler->tail

Зарегистрируйтесь для чтения событий

Далее у нас остаются эти коды, которые не были проанализированы

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    // ..
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

pipeline.fireChannelRegistered();, на самом деле ничего толкового в этом нет.В конце концов, это не что иное, как повторный вызов каждого процессора в бизнес-конвейере.ChannelHandlerAddedметод обработки обратного вызова

isActive()Возвращает true, если соединение было установлено, поэтому введите блок метода, введитеpipeline.fireChannelActive();, анализ здесь иПолный анализ запуска сервера анализа исходного кода nettyТак же, как и в анализе, здесь я опускаю подробные шаги и перехожу сразу к ключевой ссылке

AbstractNioChannel.java

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

ты должен помнитьregister0()метод, код события, зарегистрированный в селекторе, равен 0, иreadInterestOpСоответствующий код событияSelectionKey.OP_READ, обратитесь к предыдущему творениюNioSocketChannelНемного поразмыслив, умный поймет, что это на самом делеSelectionKey.OP_READСобытие регистрируется в селекторе, что указывает на то, что канал уже может начать обработку события чтения.

Суммировать

Пока вам была показана обработка новых подключений в netty, подведем итоги

  1. Поток реактора boos опрашивает новое соединение для входа
  2. Создано путем инкапсуляции базового канала jdk.NioSocketChannelИ ряд основных компонентов netty
  3. Пропустите соединение через селектор и выберите поток рабочего реактора, чтобы связать его.
  4. Зарегистрируйте события чтения, чтобы начать чтение и запись для новых подключений.

В следующей статье мы углубимся в основные компоненты netty.pipeline, Следите за обновлениями

Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…