Урожай этой статьи
Прочитайте эту статью, и вы узнаете
- Как netty принимает новые запросы
- Как netty назначает потоки реактора новым запросам
- Как netty добавляет ChannelHandler к каждому новому соединению
На самом деле, это намного больше, чем это
фон преамбулы
Прежде чем читать эту статью, лучше иметь некоторые знания о предварительном заказе, включая netty внить реактора,так же какПроцесс запуска сервераПозвольте мне дать вам краткий обзор
1. Резьба реактора в сетке
Основная вещь в netty — это два типа потоков-реакторов, которые можно рассматривать как два типа движков в netty, управляющих работой всего фреймворка netty.
Один тип потока-реактора — это группа потоков Boost, которая специально используется для приема новых соединений, а затем инкапсулирует их в объект канала и передает в группу рабочих потоков; другой тип потока-реактор — это группа рабочих потоков, которая специально используется для обработки соединения для чтения и записи.
Будь то поток Boos или рабочий поток, то, что он делает, делится на следующие три шага.
- Опрос событий ввода-вывода, зарегистрированных в селекторе
- Обработка событий ввода-вывода
- Выполнение асинхронных задач
Для потока boos первый опрашиваемый шаг — это в основном события принятия, указывающие на наличие нового соединения, в то время как опрашиваемые рабочие потоки в основном представляют собой события чтения/записи, указывающие на события чтения и записи в сети.
2. Запуск сервера
Процесс запуска сервера запускается в пользовательском потоке, первый разЗапустить поток ускорения при добавлении асинхронной задачиПри запуске netty инкапсулирует процесс обработки новых соединений в канал, а соответствующий пайплайн обрабатывает вновь установленные соединения по порядку (подробный разбор пайплайнов я начну позже)
Разобравшись с двумя предысториями, мы начинаем переходить к сути.
установление нового соединения
Проще говоря, установление нового соединения можно разделить на три шага.
- Обнаружено новое соединение
- Зарегистрируйте новое подключение к группе рабочих потоков.
- Зарегистрируйтесь для чтения событий для новых подключений
Давайте отвезем вас в Паодинцзе Ню и шаг за шагом проанализируем весь процесс.
Обнаружено новое соединение
Мы уже знаем, что, когда сервер привязан и запущен, канал сервера был зарегистрирован в потоке boost реактора, и реактор продолжает обнаруживать новые события, пока не обнаружит, что происходит событие accept.
NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
Приведенный выше кодВторая в трилогии реакторных нитей, указывая на то, что поток реактора boos был опрошен дляSelectionKey.OP_ACCEPT
Событие, указывающее на установление нового соединения, вызоветunsafe
выполнить реальную операцию
оunsafe
, я не буду вдаваться в подробности в этой статье, ниже приводится объяснение автора netty о небезопасных
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport.
Нужно только понимать общую концепцию, то есть все каналы будут иметь привязку с unsafe внизу, а реальную работу каждого типа канала реализует unsafe
А из предыдущей статьиПроцесс запуска сервера, мы уже знаем, что небезопасность канала, соответствующего серверу,NioMessageUnsafe
, то входим в егоread
метод, введите второй шаг обработки нового соединения
Зарегистрируйтесь в потоке реактора
NioMessageUnsafe.java
private final List<Object> readBuf = new ArrayList<Object>();
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
}
Я опустил код некритичной части, как видите, как только я придумал, я использовал утверждение, чтобы определить, что метод чтения должен быть вызван потоком-реактором, а затем я получил конвейер и соответствующий канал.RecvByteBufAllocator.Handle
(не объяснять сначала)
Далее звонитеdoReadMessages
метод непрерывно читает сообщение, используяreadBuf
В качестве контейнера здесь вы можете догадаться, что соединения считываются одно за другим, а затем вызыватьpipeline.fireChannelRead()
, пропускать каждое новое соединение через слой серверного канала крещения
После очистки контейнера включитеpipeline.fireChannelReadComplete()
, весь процесс четкий и понятный, без следов примесей, давайте подробно рассмотрим эти два способа
1.doReadMessages(List) 2.pipeline.fireChannelRead(NioSocketChannel)
1.doReadMessages()
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
Наконец-то мы наблюдаем за границей, где netty вызывает лежащий в основе nio jdk.javaChannel().accept();
, потому что поток-реактор в netty сканирует событие accept на первом шаге, так что вотaccept
Метод немедленно возвращается, возвращая канал, созданный базовым nio jdk.
нетти будет jdkSocketChannel
упаковано по индивидуальному заказуNioSocketChannel
, добавленный в список, чтобы внешний слой мог пройти по списку и выполнить последующую обработку
отПредыдущая статья, мы уже знаем, что ряд основных компонентов в netty будет создан во время создания сервера, в том числе конвейер, небезопасный и т. д. Итак, будет ли создаваться этот ряд компонентов при принятии нового соединения?
Имея в виду этот вопрос, мы продолжаем
NioSocketChannel.java
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
Наш ключевой анализsuper(parent, socket)
, анализ, связанный с конфигурацией, мы помещаем в следующую статью
NioSocketChannel
Родительский классAbstractNioByteChannel
AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
Здесь мы видим знакомую тень в jdk nio——SelectionKey.OP_READ
, вообще в нативном программировании jdk nio такое событие тоже будет зарегистрировано, указывая на то, что вы заинтересованы в чтении канала
Мы продолжаем идти вверх и прослеживаем доAbstractNioByteChannel
родительский классAbstractNioChannel
, вот, я думаю прочиталПредыдущая статьяУ вас должно быть представление об этой части кода
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
Когда вы создаете канал на стороне сервера, вы в конечном итоге войдете в этот метод.super(parent)
, вAbstractChannel
Создайте серию компонентов, привязанных к каналу, как показано ниже.
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
А вотreadInterestOp
Указывает, что событие, о котором заботится канал,SelectionKey.OP_READ
, событие будет зарегистрировано в селекторе позже, и тогда канал будет установлен в неблокирующий режим
На этом этапе я наконец-то могу показать вам структурную диаграмму наиболее часто используемых каналов в netty.
Отношения наследования здесь упрощены, в настоящее время нам нужно только понять.
первый
- канал наследует Comparable означает, что канал является сопоставимым объектом
- Канал наследует AttributeMap, указывая, что канал является объектом, который может связывать атрибуты.В пользовательском коде мы часто используем из него метод channel.attr(...).
- ChannelOutboundInvoker — это новая абстракция, добавленная в версии 4.1.x, представляющая операции, которые может выполнять канал.
- DefaultAttributeMap используется для метода по умолчанию абстракции AttributeMap, и последний канал наследует прямое использование
- AbstractChannel используется для реализации большинства методов канала, из которых нам наиболее знакомы базовые компоненты, создающие канал в его конструкторе.
- AbstractNioChannel выполняет некоторые операции, связанные с nio, на основе AbstractChannel и сохраняет базовый jdk.
SelectableChannel
, и установите неблокирующий канал в конструкторе - Наконец, есть два канала, NioServerSocketChannel и NioSocketChannel, которые соответствуют процессу принятия новых соединений сервером и процессу чтения и записи новых соединений.
Прочитав это, вы в основном поняли более половины общей структуры канала.
Хорошо, давайте выйдем из стека, продолжим предыдущий анализ исходного кода и создадимNioSocketChannel
После этого, поместив его в контейнер List, приступайте к следующему шагу
2.pipeline.fireChannelRead(NioSocketChannel)
AbstractNioMessageChannel.java
pipeline.fireChannelRead(NioSocketChannel);
Прежде чем я официально представлю конвейер, позвольте мне кратко представить компонент конвейера.
В различных типах каналов netty будет конвейер, что буквально означает конвейер.Мы можем понимать его как процесс конвейера.Процесс конвейера имеет начальную точку, конец и различные уровни конвейера в середине.Элемент , начать обработку в начале конвейера, пройти обработку каждого уровня конвейера и, наконец, завершить конвейер
Соответственно netty начало конвейераHeadContxt
, конец конвейераTailConext
,HeadContxt
вызыватьUnsafe
выполнять определенные операции,TailConext
Он используется для создания необработанных исключений в конвейере и предупреждения пользователя о необработанных сообщениях.Мы подробно обсудим конкретный анализ конвейера позже.
пройти черезпредыдущая статья, мы уже знаем, что в пайплайне, где сервер обрабатывает новые соединения, автоматически добавился обработчик пайплайнаServerBootstrapAcceptor
, а в конструктор был передан ряд параметров, заданных в пользовательском коде.ServerBootstrapAcceptor
ServerBootstrapAcceptor.java
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
фронтpipeline.fireChannelRead(NioSocketChannel);
Наконец, через цепочку вызовов head->unsafe->ServerBootstrapAcceptor вызовите здесьServerBootstrapAcceptor
изchannelRead
метод
а такжеchannelRead
Как только вы подойдете, принудительно преобразуйте сообщение здесь вChannel
, почему здесь возможно принуждение? Читатели могут подумать о
Затем получите канал, который мы использовали раньше.NioSocketChannel
Соответствующий конвейер, пользовательский код вchildHandler
, добавлено в конвейер, здесьchildHandler
Это отражается в пользовательском коде как
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
На самом деле он соответствуетChannelInitializer
, иди туда,NioSocketChannel
Процессор, соответствующий конвейеру, это head->ChannelInitializer->tail, имейте в виду, что он будет упомянут позже!
Далее установитеNioSocketChannel
Соответствующий атрибут и параметр, а затем введитеchildGroup.register(child)
, здесь childGroup — это то, что мы добавили в стартовом кодеNioEventLoopGroup
, вы можете обратиться кэта статья
мы вступаем вNioEventLoopGroup
изregister
метод, проксируемый его родительскому классуMultithreadEventLoopGroup
MultithreadEventLoopGroup.java
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
Вот еще один метод next(), давайте продолжим
MultithreadEventLoopGroup.java
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
вернуться к своему родительскому классу
MultithreadEventExecutorGroup.java
@Override
public EventExecutor next() {
return chooser.next();
}
Класс, соответствующий селектору, здесьEventExecutorChooser
, буквально означает селектор исполнителя событий, роль его размещения в нашем контексте заключается в выборе потока реактора из группы потоков рабочего реактора.
public interface EventExecutorChooserFactory {
/**
* Returns a new {@link EventExecutorChooser}.
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* Chooses the next {@link EventExecutor} to use.
*/
@UnstableApi
interface EventExecutorChooser {
/**
* Returns the new {@link EventExecutor} to use.
*/
EventExecutor next();
}
}
Я не планирую расширять конкретное создание селектора, полагаю, что навыки чтения исходников в предыдущих статьях могут помочь вам узнать начало и конец селектора, здесь скажу прямо (но советую проанализировать его самостоятельно, это очень просто), Chooser Есть две реализации
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
По умолчанию селектор проходитDefaultEventExecutorChooserFactory
Создано. При создании селектора потока реактора будет оцениваться количество потоков реактора. Если это степень 2, он будет создан.PowerOfTowEventExecutorChooser
, иначе создатьGenericEventExecutorChooser
При выборе потока реактора два типа селекторов выбирают поток реактора с помощью метода циклического перебора. Единственное отличие состоит в том, чтоPowerOfTowEventExecutorChooser
выполняется операцией AND, иGenericEventExecutorChooser
Именно через операцию остатка, и эффективность операции выше, чем у операции остатка.Видно, что netty просто без ума от оптимизации эффективности!
После выбора потока реактора, т.е.NioEventLoop
После этого возвращаемся туда, где мы зарегистрировались
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
прокси дляNioEventLoop
родительского классаregister
метод
SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
На самом деле этот процесс уже запущен как сервер, вы можете обратиться к подробным шагамПодробное объяснение запуска сервераВ этой статье мы сразу переходим к ключевой ссылке
AbstractNioChannel.java
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
Как и процесс запуска сервера, первый вызовdoRegister();
Выполните реальный процесс регистрации следующим образом.
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
привязать канал кselector
Up, селектор используется потоком-реактором, и последующий опрос событий канала, а также обработка событий, асинхронное выполнение задачи — все это отвечает за этот поток-реактор.
После привязки потока реактора вызовитеpipeline.invokeHandlerAddedIfNeeded()
Как мы уже говорили, покаNioSocketChannel
В конвейере есть три процессора, head->ChannelInitializer->tail, которые в конечном итоге будут вызываться дляChannelInitializer
изhandlerAdded
метод
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
handlerAdded
вызов методаinitChannel
метод, вызовremove(ctx);
удалить себя
AbstractNioChannel.java
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
А вотinitChannel
Метод еще один фокус? Вернемся к пользовательскому методу, такому как следующий пользовательский код
Код пользователя
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
О, это оказалось в нашем собственном коде! Я не буду объяснять, что делает этот код, знаете ли~
После завершения,NioSocketChannel
Обработчики связанного конвейера включают head->LoggingHandler->EchoServerHandler->tail
Зарегистрируйтесь для чтения событий
Далее у нас остаются эти коды, которые не были проанализированы
AbstractNioChannel.java
private void register0(ChannelPromise promise) {
// ..
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
pipeline.fireChannelRegistered();
, на самом деле ничего толкового в этом нет.В конце концов, это не что иное, как повторный вызов каждого процессора в бизнес-конвейере.ChannelHandlerAdded
метод обработки обратного вызова
isActive()
Возвращает true, если соединение было установлено, поэтому введите блок метода, введитеpipeline.fireChannelActive();
, анализ здесь иПолный анализ запуска сервера анализа исходного кода nettyТак же, как и в анализе, здесь я опускаю подробные шаги и перехожу сразу к ключевой ссылке
AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
ты должен помнитьregister0()
метод, код события, зарегистрированный в селекторе, равен 0, иreadInterestOp
Соответствующий код событияSelectionKey.OP_READ
, обратитесь к предыдущему творениюNioSocketChannel
Немного поразмыслив, умный поймет, что это на самом делеSelectionKey.OP_READ
Событие регистрируется в селекторе, что указывает на то, что канал уже может начать обработку события чтения.
Суммировать
Пока вам была показана обработка новых подключений в netty, подведем итоги
- Поток реактора boos опрашивает новое соединение для входа
- Создано путем инкапсуляции базового канала jdk.
NioSocketChannel
И ряд основных компонентов netty - Пропустите соединение через селектор и выберите поток рабочего реактора, чтобы связать его.
- Зарегистрируйте события чтения, чтобы начать чтение и запись для новых подключений.
В следующей статье мы углубимся в основные компоненты netty.pipeline
, Следите за обновлениями
Если вы хотите систематически изучать Нетти, мой буклет«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…