Анализ исходного кода Netty Channel

Java задняя часть Netty API

Оригинальная ссылка:Вэй Ван.one/posts/Netty…

Фронт, у нас есть общее представление о нескольких основных компонентах Netty. Сегодня давайте сначала представим компоненты сетевой связи Netty, которые используются для выполнения сетевых операций ввода-вывода.Channel.

Нетти версия: 4.1.30

Обзор

Данные всегда передаются в виде байтов в сети. То, какую закодированную передачу (OIO, NIO и т. д.) мы используем в сети при программировании, определяет передача этих байтов.

До Netty для улучшения параллелизма системы при переходе с OIO на NIO требовалось много рефакторинга кода, потому что соответствующий Java NIO сильно отличался от IO API. Netty создала уровень инкапсуляции на основе этих нативных API-интерфейсов Java, предоставляя пользователям высоко абстрактный и унифицированный API, так что переключение методов передачи больше не составляет труда.Весь код рефакторинг.

Netty Channel UML

Семейство каналов netty выглядит следующим образом:

NettyChannel

Во всей группе,AbstractChannelЭто наиболее важный абстрактный класс, от которого наследуются классы AbstractNioChannel, AbstractOioChannel, AbstractEpollChannel, LocalChannel, EmbeddedChannel и др. Каждый класс представляет разные протоколы и соответствующие модели ввода-вывода. Помимо протокола TCP, Netty также поддерживает множество других протоколов подключения, и каждый протокол имеет различие между версиями NIO (асинхронный ввод-вывод) и OIO (Old-IO, то есть традиционный блокирующий ввод-вывод). соединения Существуют различные типы каналов, соответствующие ему. Вот некоторые часто используемые типы каналов:

  • NioSocketChannel: представляет собой асинхронное клиентское соединение TCP Socket.
  • NioServerSocketChannel: асинхронное соединение TCP Socket на стороне сервера.
  • Niodagramchannel: асинхронное UDP-соединение
  • NioSctpChannel: асинхронное клиентское Sctp-соединение
  • NioSctpServerChannel: асинхронное соединение Sctp на стороне сервера.
  • OioSocketChannel: синхронное клиентское подключение через сокет TCP
  • OioServerSocketChannel: синхронное соединение TCP Socket на стороне сервера.
  • OioDatagramChannel: синхронизированное соединение UDP
  • OioSctpChannel: сервер синхронизации Sctp подключен
  • OusctpserverChannel: Синхронизация связей Client TCP Соединение

Channel API

Давайте сначала посмотрим на интерфейс верхнего уровняchannelОсновные API, которые обычно используются, следующие:

имя интерфейса описывать
eventLoop() Канал должен быть зарегистрирован в мультиплексоре EventLoop для обработки событий ввода-вывода.Цикл событий, зарегистрированный каналом, можно получить с помощью метода eventLoop(). EventLoop — это, по сути, поток Reactor, который обрабатывает сетевые события чтения и записи. В Netty он используется не только для обработки сетевых событий, но и для выполнения таких задач, как временные задачи и пользовательские NioTasks.
pipeline() Возвращает ChannelPipeline, выделенный каналом
isActive() Определите, активирован ли канал. Активация основного значения зависит от типа передачи. Например, транспортный сокет После подключения к удаленному узлу активен, после открытия передачи активна дейтаграмма.
localAddress() Возвращает адрес локального сокета
remoteAddress() Возвращает адрес удаленного сокета
flush() Сбросить ранее записанные данные в базовый канал
write(Object msg) Запрос на запись текущего сообщения в целевой канал через ChannelPipeline. Обратите внимание, что операция записи только сохраняет сообщение в кольцевом массиве отправки сообщений и фактически не отправляется.Только при вызове операции сброса оно будет записано в канал и отправлено другой стороне.
writeAndFlush() Эквивалентно вызову write() с последующим flush()
metadate() Читатели, знакомые с протоколом TCP, возможно, знают, что при создании сокета необходимо указать параметры TCP, такие как размер буфера TCP для приема и отправки и время ожидания TCP. Следует ли повторно использовать адреса и т. д. В Netty каждый канал соответствует физическому каналу, и каждое соединение имеет свою собственную конфигурацию параметров TCP. Следовательно, Channel будет агрегировать ChannelMetadata, чтобы предоставить информацию описания метаданных для параметров TCP, а конфигурацию параметров TCP текущего канала можно получить с помощью метода metadata().
read() Он считывает данные из текущего канала в первый входящий буфер, если данные читаются успешно, вызывая событие ChannelHandler.channelRead (ChannelHandlerContext, Object). После завершения вызова API операции чтения немедленно инициируется событие ChannelHander.channelReadComplete (ChannelHandlerContext), ChannelHandler этого бизнеса может решить, следует ли продолжать чтение данных. Если запрос операции уже находится на рассмотрении, последующее чтение будет проигнорировано.
close(ChannelPromise promise) Активно закрыть текущее соединение, установить результат операции через ChannelPromise и уведомить о результате, независимо от того, успешна операция или нет, вы можете получить результат операции через ChannelPromise. Эта операция каскадирует события ChannelHandler.close(ChannelHandlerContext, ChannelPromise) всех ChannelHandler в ChannelPipeline.
parent() Для канала на стороне сервера его родительский канал пуст; для канала на стороне клиента его родительским каналом является канал ServerSocketChannel, создавший его.
id() Возвращает объект ChannelId, который является уникальным идентификатором канала.

Создание канала

После предварительного понимания API Netty Channel и связанных с ним классов давайте подробнее рассмотрим, как создается канал во время запуска Netty. Процесс создания канала на стороне сервера в основном делится на четыре шага: 1) создание канала; 2) инициализация канала; 3) регистрация канала; 4) привязка канала.

Netty Channel Process

В качестве примера разберем следующий код:

// 创建两个线程组,专门用于网络事件的处理,Reactor线程组
// 用来接收客户端的连接,
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();

// 创建辅助启动类ServerBootstrap,并设置相关配置:
ServerBootstrap b = new ServerBootstrap();
// 设置处理Accept事件和读写操作的事件循环组
b.group(bossGroup, workGroup)
         // 配置Channel类型
        .channel(NioServerSocketChannel.class)
         // 配置监听地址
        .localAddress(new InetSocketAddress(port))
         // 设置服务器通道的选项,设置TCP属性
        .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
         // 设置建立连接后的客户端通道的选项
        .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
         // channel属性,便于保存用户自定义数据
        .attr(AttributeKey.newInstance("UserId"), "60293")
    	.handler(new LoggingHandler(LogLevel.INFO))
        // 设置子处理器,主要是用户的自定义处理器,用于处理IO网络事件
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(serverHandler);
            }
        });

// 调用bind()方法绑定端口,sync()会阻塞等待处理请求。这是因为bind()方法是一个异步过程,会立即返回一个ChannelFuture对象,调用sync()会等待执行完成
ChannelFuture f = b.bind().sync();
// 获得Channel的closeFuture阻塞等待关闭,服务器Channel关闭时closeFuture会完成
f.channel().closeFuture().sync();

Вызовите интерфейс channel() для установкиAbstractBootstrapпеременная-членchannelFactory, который, как следует из названия, является фабричным классом, используемым для создания канала. Исходный код выглядит следующим образом:


...

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    // 创建 channelFactory
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

...

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
    this.channelFactory = channelFactory;
    return (B) this;
}

...

channelFactoryУстановить какReflectiveChannelFactory, в нашем случае clazzNioServerSocketChannel, мы видим, что есть интерфейс newChannel(), который вызывается отражением, вызов этого интерфейса мы представим позже. Исходный код выглядит следующим образом:

// Channel工厂类
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
	
    @Override
    public T newChannel() {
        try {
            // 通过反射来进行常见Channel实例
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

Далее давайте посмотрим наNioServerSocketChannelКонструкторы, в основном:

  • Создайте объект ServerSocketChannel. Когда NioServerSocketChannel создан, сначала используйте openServerSocketChannel SelectorProvider, чтобы открыть канал сокета сервера. SelectorProvider — это абстрактный класс, предоставляемый Java NIO, который является поставщиком услуг для селекторов и выбираемых каналов. Конкретными классами реализации являются SelectorProviderImpl, EPollSelectorProvide, PollSelectorProvider. Основная задача селектора — выбрать соответствующего провайдера в соответствии с типом и версией операционной системы: если версия ядра LInux >= 2.6, конкретным SelectorProvider является EPollSelectorProvider, в противном случае это PollSelectorProvider по умолчанию.
  • Задайте переменную-член ServerSocketChannelConfig.
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        // 调用JDK底层API生成 ServerSocketChannel 对象实例
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

private final ServerSocketChannelConfig config;

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 调用 AbstractNioChannel 构造器,创建 NioServerSocketChannel,设置SelectionKey为ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 创建ChannleConfig对象,主要是TCP参数配置类
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

Конструктор AbstractNioChannel выглядит следующим образом:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 调用 AbstractChannel 构造器
    super(parent);
    this.ch = ch;
    // 从上一步过来,这里设置为 SelectionKey.OP_ACCEPT
    this.readInterestOp = readInterestOp;
    try {
        // 设置为非阻塞状态
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

существуетAbstractChannelВ конструкторе будут установлены три основных объекта, связанных с Channel: ChannelId, ChannelPipeline и Unsafe.

  • Инициализировать ChannelId, ChannelId — глобально уникальное значение;
  • СоздайтеNioMessageUnsafeНапример, этот класс предоставляет каналу базовые операции, связанные с сетевой связью, такие как connect(), read(), register(), bind(), close() и т. д.;
  • Создайте DefaultChannelPipeline для канала, начальный конвейер распространения событий. Для анализа Pipeline см.эпилоганализ.
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 设置ChannelId
    id = newId();
    // 设置Unsafe
    unsafe = newUnsafe();
    // 设置Pipeline
    pipeline = newChannelPipeline();
}

Отслеживая конструктор NioServerSocketChannelConfig, переменная-член канала устанавливается в DefaultChannelConfig.

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    // 绑定channel
    this.channel = channel;
}

Выше описан процесс создания канала.

  • Канал создается путем отражения через фабричный класс ReflectiveChannelFactory;
  • В процессе создания канала создаются четыре важных объекта: ChannelId, ChannelConfig, ChannelPipeline и Unsafe.

Инициализация канала

В основном он делится на следующие два этапа:

  • Установите параметры и свойства, установленные программой запуска (Bootstrap), на NettyChannel
  • Добавить обработчик инициализации в Pipeline для использования после регистрации

мы начинаем сAbstractBootstrapЗапускается интерфейс bind() и цепочка вызовов: bind() —> doBind(localAddress) —> initAndRegister() —> init(канал канала), давайте посмотримServerBootstrapРеализация интерфейса init() в:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 调用Channel工程类的newChannel()接口,创建channel,就是前面我们讲的部分内容
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        ....
}

Чтобы инициализировать канал, давайте сосредоточимся на интерфейсе init(channel):

void init(Channel channel) throws Exception {
    // 获取启动器 启动时配置的option参数,主要是TCP的一些属性
    final Map<ChannelOption<?>, Object> options = options0();
    // 将获得到 options 配置到 ChannelConfig 中去
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 获取 ServerBootstrap 启动时配置的 attr 参数
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    // 配置 Channel attr,主要是设置用户自定义的一些参数
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
	
    // 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline
    ChannelPipeline p = channel.pipeline();

    // 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup
    final EventLoopGroup currentChildGroup = childGroup;
    // 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    // 保存用户设置的 childOptions 到局部变量 currentChildOptions
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    // 保存用户设置的 childAttrs 到局部变量 currentChildAttrs
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取启动器上配置的handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 添加 handler 到 pipeline 中
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor
                    // 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去
                    // 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

Анализ нового адаптера подключения ServerBootstrapAcceptor см.эпилог

Регистрация канала

После того, как канал создан и инициализирован, его необходимо зарегистрировать в селекторе петлителя событий. Возвращаемся к интерфейсу initAndRegister:

final ChannelFuture initAndRegister() {

    ...

    // 获取 EventLoopGroup ,并调用它的 register 方法来注册 channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

В конечном итоге он вызовет интерфейс регистра в SingleThreadEventLoop:

Как позвонить здесь, подробности нужно подождать, пока в следующей статье не будет рассказано о MultithreadEventExecutorGroup, а затем подробно объяснить

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 调用unsafe的register接口
    promise.channel().unsafe().register(this, promise);
    return promise;
}

Код отслеживается до регистрации интерфейса в классе AbstractUnsafe в AbstractChannel.

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 将该Channel与eventLoop 进行绑定,后续与该channel相关的IO操作都由eventLoop来处理
    AbstractChannel.this.eventLoop = eventLoop;
	// 初次注册时 eventLoop.inEventLoop() 返回false
    if (eventLoop.inEventLoop()) {
        // 调用实际的注册接口register0
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 调用实际的注册接口register0
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

Интерфейс register0 в основном разделен на следующие три раздела логики:

  • doRegister();

  • pipeline.invokeHandlerAddedIfNeeded();

  • pipeline.fireChannelRegistered();

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 调用 doRegister() 接口
        doRegister();
        neverRegistered = false;
        registered = true;
        
       	// 通过pipeline的传播机制,触发handlerAdded事件
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        // 通过pipeline的传播机制,触发channelRegistered事件
        pipeline.fireChannelRegistered();
        // 还没有绑定,所以这里的 isActive() 返回false.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

Давайте посмотрим на интерфейс doRegister() в AbstractNioChannel и, наконец, вызовем базовый NIO API Java JDK для регистрации.

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // eventLoop().unwrappedSelector():获取selector,将在后面介绍 EventLoop 创建时会讲到
            // 将selector注册到Java NIO Channel上
            // ops 设置为 0,表示不关心任何事件
            // att 设置为 channel自身,表示后面还会将channel取出来用作它用(后面文章会讲到)
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

Привязка канала

После завершения создания, инициализации и регистрации следующим шагом является операция привязки канала.

Механизм распространения событий конвейера, задействованный в этом разделе, будет объяснен в следующей статье.

Начиная с интерфейса bind() активатора, вызовите метод doBind() вниз:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

Метод doBind вызовет метод doBind0(). В методе doBind0() задача bind() канала будет выполняться через EventLoop. Анализ интерфейса выполнения EventLoop см. в следующемстатья.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 调用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

Метод doBind0() будет использоваться в следующем разделе.pipeline.bind(localAddress, promise);Метод через механизм распространения конвейера в конечном итоге вызовет метод AbstractChannel.AbstractUnsafe.bind(), который в основном делает две вещи:

  • Вызовите doBind(): вызов базового API JDK для привязки порта канала.
  • Вызовите pipe.fireChannelActive():

Механизм связи конвейера см.эпилог

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在绑定成功前为 false
    boolean wasActive = isActive();
    try {
        // 调用doBind()调用JDK底层API进行端口绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
	// 完成绑定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

Здесь мы рассмотрим метод doBind, реализованный NioServerSocketChannel на стороне сервера, который в конечном итоге вызовет метод bind базового канала NIO JDK:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

Вызовите pipe.fireChannelActive(), чтобы начать распространение активных событий.Конвейер сначала вызовет узел HeadContext для распространения событий, а затем вызовет метод DefaultChannelPipeline.HeadContext.channelActive():

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 触发heanlder 的 ChannelActive 方法
    ctx.fireChannelActive();
    // 调用接口readIfIsAutoRead
    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        // 调用channel.read()
        channel.read();
    }
}

Метод channel.read() вызовет метод AbstractChannelHandlerContext.read() вниз:

@Override
public ChannelHandlerContext read() {
    // 获取下一个ChannelHandlerContext节点
    final AbstractChannelHandlerContext next = findContextOutbound();
    // 获取EventExecutor
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 调用下一个节点的invokeRead接口
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }
    return this;
}

Через механизм распространения событий конвейера в конечном итоге будет вызван метод AbstractChannel.AbstractUnsafe.beginRead():

@Override
public final void beginRead() {
    assertEventLoop();
    if (!isActive()) {
        return;
    }
    try {
        // 调用 doBeginRead();
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

Давайте посмотрим на логику реализации интерфейса AbstractNioChannel для doBeginRead:

// 注册一个OP_ACCEPT
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    // 获取channel注册是的设置的 selectionKey
    final SelectionKey selectionKey = this.selectionKey;
    // selectionKey无效则返回
    if (!selectionKey.isValid()) {
        return;
    }
	
    readPending = true;
	// 前面讲到channel在注册的时候,这是 interestOps 设置的是 0
    final int interestOps = selectionKey.interestOps();
    // readInterestOp 在前面讲到channel创建的时候,设置值为 SelectionKey.OP_ACCEPT
    if ((interestOps & readInterestOp) == 0) {
        // 最终 selectionKey 的兴趣集就会设置为 SelectionKey.OP_ACCEPT
        // 表示随时可以接收新连接的接入
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

Суммировать

До сих пор мы анализировали процесс создания канала, инициализации, регистрации и привязки. Знания об задействованных механизмах распространения событий EventLoopGroup и Pipeline будут объяснены в следующих статьях.

использованная литература