Вы можете понять модель конвейера за пять минут — анализ исходного кода Netty

Java

Серия анализов исходного кода Netty

1. Введение в конвейер

1. Что такое трубопровод

    pipelineОн имеет значение конвейера и сборочной линии и впервые был использован вUnixВ операционной системе программы с различными функциями могут взаимодействовать друг с другом, что делает программное обеспечение более "высокой связностью, низкой связанностью". В ней используется "цепная модель" для объединения различных программ или компонентов, чтобы они образовывали прямую линию. работа. поток.

2. ChannelPipeline Нетти

    ChannelPipelineобрабатывается или перехватываетсяchannelДвусвязный список входящих и исходящих событий с событиями вChannelPipelineпоток и пропуск, могут быть добавлены или удаленыChannelHandlerРеализовать обработку различной бизнес-логики. С точки зрения непрофессионала,ChannelPipelineЭто сборочная линия на заводе.ChannelHandlerРабочие на сборочной линии.
    ChannelPipelineпри созданииChannelсоздается автоматически, когда каждыйChannelиметь свои собственныеChannelPipeline.

3. Обработка событий Netty I/O

     Как показано, входящие события генерируютсяI/OПоток пассивно запускается и обрабатывается входящим процессором в восходящем направлении, может быть перехвачен и отброшен в середине, а исходящее событие обрабатывается пользователем.handlerАктивно инициируется, обрабатывается исходящим процессором в направлении сверху вниз

2. Контекст обработчика канала

1. Что такое ChannelHandlerContext

    ChannelHandlerContextбудетChannelHandlerиChannelPipelineСвязанные контексты, каждый добавил одинhandlerбудет создаватьChannelHandlerContextэкземпляр, управлениеChannelHandlerсуществуетChannelPipelineпоток распространения в .

2. Связь между ChannelHandlerContext и ChannelPipeline и ChannelHandler

    ChannelPipelineзависит отChannelсоздается автоматически, сохраняетchannel, положить всеhandlerОрганизовано, оно эквивалентно сборочному конвейеру завода.
    ChannelHandlerИмеет независимую логику функций и может быть зарегистрирован в несколькихChannelPipeline, не сохраняетсяchannel, эквивалент фабричного рабочего.
    ChannelHandlerContextсвязаноChannelHandlerиChannelPipelineконтекст, сохраняетChannelPipeline,контрольChannelHandlerсуществуетChannelPipelineПоток распространения в , эквивалентен руководителю бригады на конвейере.

3. Распространяйте входящие события

1. Что такое входящие события?

    (1) channelRegisteredвопрос регистрации,channelзарегистрироваться наEventLoopПосле совершения звонка, например, при запуске служебного поста,pipeline.fireChannelRegistered();
    (2) channelUnregisteredсобытие выхода из системы,channelотEventLoopВызывается после выхода из системы, например, после успешного закрытия соединения,pipeline.fireChannelUnregistered();    (3) channelActiveСобытие активации вызывается после успешной привязки порта,pipeline.fireChannelActive();
    (4) channelInactiveНеактивное событие, вызываемое после закрытия соединения,pipeline.fireChannelInactive();    (5) channelReadчитать события,channelВызывается, когда данные доступны,pipeline.fireChannelRead();
    (6) channelReadCompleteПрочитав событие,channelПозвонил после прочтенияpipeline.fireChannelReadComplete();
    (7) channelWritabilityChangedДоступные для записи события изменения состояния, когдаChannelВыполняется, когда доступное для записи состояниеOOM,pipeline.fireChannelWritabilityChanged();
    (8) userEventTriggeredТриггеры пользовательских событий, такие как обнаружение сердцебиения,ctx.fireUserEventTriggered(evt);
    (9) exceptionCaughtненормальное событие Пояснение: Мы видим, чтоInboundсобытия делаютсяI/OТриггеры потока, пользователь реализует пассивный вызов частично связанных событий
    инструкция: Мы видим, чтоInboundсобытия делаютсяI/OТриггеры потока, пользователь реализует пассивный вызов частично связанных событий

2. Добавить событие чтения

     спереди«Анализ исходного кода Netty — анализ процесса запуска сервера»и«Анализ исходного кода Netty — доступ к клиентскому соединению и чтение анализа ввода-вывода»Мы знаем, что при доступе к новому соединению мы выполняем процесс регистрации.После успешной регистрации мы вызовемchannelRegistered, мы начинаем с этого метода

   public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
          initChannel((C) ctx.channel());
          ctx.pipeline().remove(this);
          ctx.fireChannelRegistered();
}

    initChannelпараметр, настраиваемый при запуске службыchildHandlerПереопределить метод родительского класса

private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel");
        ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
        ch.pipeline().addLast(new IOHandler());
    }
}

     Давайте вспомним,pipelineгде он был создан

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

     при созданииchannelсоздается автоматически, когдаpipeline

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

     Здесь будут созданы два значения по умолчаниюhandler,ОдинInboundHandler --> TailContext,ОдинOutboundHandler --> HeadContext
     посмотри сноваaddLastметод

@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

     создать один здесьhandlerимя, сгенерированное правиломhandlerимя класса плюс"#0"

  @Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    …
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, generateName(h), h);
    }
    return this;
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);
        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }
    return this;
}

     так какpipelineОн небезопасен для потоков, и для обеспечения безопасности одновременного доступа используются блокировки.handlerПроверка на дублирование имени, будетhandlerупаковано вDefaultChannelHandlerContext, и, наконец, добавлено вpipeline

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

     Вот три шага
    (1)правильноDefaultChannelHandlerContextПроверка повторяемости, еслиDefaultChannelHandlerContextневозможно в несколькихpipelineподелился и был добавлен вpipeline, выбрасывается исключение
    (2)Исправлятьpipelineуказатель в
        Добавить кIdleStateHandlerДо
         HeadContext --> IOChannelInitialize --> TailContext

        Добавить кIdleStateHandlerпосле
         HeadContext --> IOChannelInitialize --> IdleStateHandler --> TailContext

    (3)будетhandlerНаваDefaultChannelHandlerContextСоздание отношения сопоставления
    (4)ПерезвониhandlerДобавить событие слушателя завершения
     удалить последнийIOChannelInitialize

     Последовательность в цепочке событий в конце такова:
        HeadContext --> IdleStateHandler --> IOHandler --> TailContext

3. анализ событий pipe.fireChannelRead()

     Здесь мы выбираем типичный анализ событий чтения, другие процессы событий в основном аналогичны

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {	
	…
	if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    		unsafe.read();
	}
	…
}

     когдаbossТема мониторы прочитанные события, Call ** Unsafe.read () ** метод

@Override
public final void read() {
	…
	pipeline.fireChannelRead(byteBuf);
	…
}

     Входящие события отheadНачинать,tailЗаканчивать

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
    return this;
}

     найтиpipelineследующийInboundсобытие

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

    HeadContextследующийInboundсобытиеIdleStateHandler

private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}

     положи этоchannelСобытие чтения идентифицируется какtrue, и перейти к следующемуhandler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    super.channelRead(ctx, msg);
    System.out.println(msg.toString());
}

     выполнить здесьIOHandlerпереписанныйchannelRead()метод и вызвать родительский классchannelReadметод

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

     Продолжить вызов следующего в цепочке событийhandler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

     позвоню сюдаTailContextизReadметод, выпускmsgтайник
    Суммировать:распространятьInboundсобытие отHeadContextУзлы распространяются вверх до тех пор, покаTailContextУзел заканчивается

4. Распространяйте исходящие события

1. Что такое исходящие события?

    (1) bindсобытие, привязать порт
    (2) closeсобытие, закройте канал
    (3) connectсобытие, для подключения клиента к удаленному компьютеру
    (4) disconnectсобытие, для клиента, закрыть удаленное соединение
    (5) deregisterсобытие для клиента при выполнении отключенияdisconnectПозвонили после операции, будетchannelотEventLoopвыйти
    (6) readСобытие, используемое для нового подключения доступа, после успешной регистрации на мультиплексоре модифицирует монитор наOP_READБит операции
    (7) writeсобытие, записать данные в канал
    (8) flushсобытие для сброса данных из очереди канала на удаленную машину

2. Разобрать событие записи

	ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
	ctx.channel().write(resp);

     Мы вызываем его прямо в проекте, как указано выше.writeЗаписывайте данные, а не напрямую вchannel, но писать в буфер, но и вызыватьflushметод сброса данных вchannelили позвоните напрямуюwriteAndFlush.
     Здесь мы выбираем более типичныйwriteсобытие для разбораOutboundПроцесс, другие процессы событий аналогичны

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

     связанный контекстомchannelпозвонить напрямуюwriteметод, вызовchannelв соответствующей цепочке событийhandler

@Override
public ChannelFuture write(Object msg) {
    return tail.write(msg);
}

     Событие записи отtailВ направленииheadВызов, прямо противоположный событию чтения

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
	...
	 write(msg, false, promise);
	...
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
	...
}
...
}

     После нескольких прыжков получить предыдущийOunboundцепь событийhandler

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

    IdleStateHandlerобаInboundсобытие, опять жеOutboundсобытие
     Продолжить переход к предыдущемуhandler

     предыдущий былHeadContextиметь дело с

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}
@Override
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    ...
    outboundBuffer.addMessage(msg, size, promise);
...
}

     Отсюда мы видим, что данные наконец забрасываются в буфер, и с тех порnettyизpipelineМы закончили анализ модели
     Связанныйinboundсобытия иoutboundПередачу событий можно представить следующей схемой:

Если вы найдете это полезным, пожалуйста, нажмите"отличный"