Есть чувства, есть галантерейные товары, поиск в WeChat【Третий принц Ао Бин] Обратите внимание на этого другого программиста.
эта статьяGitHub github.com/JavaFamilyВключено, и есть полные тестовые площадки, материалы и мой цикл статей для интервью с производителями первой линии.
Предупреждение о высокой энергии, я начал писать эту статью месяц назад, поэтому содержание будет очень длинным, и, конечно, очень хардкорным.После того, как серия исходных кодов dubbo закончится, я хочу написать о серии netty, но концепции исходного кода netty очень много, поэтому я написал это сейчас.
Я полагаю, что 90% читателей не прочитают его за один раз, потому что он слишком длинный, и все мои нынешние наиболее подходящие поля ввода и редактирования mbp застряли, но я надеюсь, что все хотят смотреть netty в будущем или нуждаются это перед интервью.Достаточно знакомым друзьям оглянуться назад,тогда смысл написания мною этой статьи будет там.
Там не так много ББ, просто открывай все сразу.
Основные понятия NIO
Блокирующие (Block) и неблокирующие (Non-Block)
Блокировка и неблокировка — это способ обработки того, готовы ли данные, когда процесс обращается к данным, когда данные не готовы.
блокировать: Часто необходимо дождаться готовности данных в буфере перед обработкой других вещей, иначе они всегда будут ждать там.
неблокирующий: Когда наш процесс обращается к нашему буферу данных, если данные не готовы, он вернется напрямую, без ожидания. Если данные готовы, также вернитесь напрямую.
Блокировка ввода-вывода:
Неблокирующий ввод-вывод:
Синхронный и асинхронный
И синхронные, и асинхронные основаны на том, как приложение и операционная система обрабатывают события ввода-вывода. Например
**Синхронизация:** — это операция, в которой прикладная программа хочет напрямую участвовать в операциях чтения и записи ввода-вывода.
**Асинхронный:** все операции чтения и записи передаются операционной системе для обработки, и приложению нужно только дождаться уведомления.
При обработке событий ввода-вывода в синхронном режиме вы должны заблокировать метод, чтобы дождаться завершения наших событий ввода-вывода (блокировка событий ввода-вывода или опрос событий ввода-вывода).Для асинхронного все операции чтения и записи ввода-вывода передаются операционной системе. В это время мы можем делать другие вещи, не завершая реальную операцию ввода-вывода.Когда операция завершит ввод-вывод, оно уведомит наше приложение.
Следовательно, прямое преимущество асинхронности по сравнению с синхронизацией заключается в том, что при обработке данных ввода-вывода мы можем использовать эту часть ожидающих ресурсов для обработки других транзакций и повышения производительности самого нашего сервиса.
Синхронный ввод-вывод:
Асинхронный ввод-вывод:
Java BIO против NIO
БИО (традиционный ИО):
BIO — это синхронный и блокирующий режим ввода-вывода,Устаревшие пакеты java.io, который реализован на основе потоковой модели и предоставляет некоторые наиболее знакомые нам функции ввода-вывода, такие какАбстракция файлов, входные и выходные потокиЖдать.Интерактивный режим синхронный и блокирующий, то есть при чтении входного потока или записи выходного потока поток будет блокироваться там до тех пор, пока действия чтения и записи не будут завершены, а вызовы между ними будут в надежном линейном порядке.
NIO (неблокирующий/новый ввод-вывод)
NIO — это синхронная неблокирующая модель ввода-вывода, представленная в Java 1.4, соответствующая пакету java.nio, предоставляющая такие абстракции, как Channel, Selector и Buffer. N в NIO можно понимать как неблокирующий, а не просто новый. Он поддерживает методы манипулирования вводом-выводом, ориентированные на буфер и канал. NIO обеспечивает то же самоеSocket
иServerSocket
СоответствующийSocketChannel
иServerSocketChannel
Две разные реализации канала сокета, поддерживающие как блокирующий, так и неблокирующий режимы. Для высоконагруженных приложений с высокой степенью параллелизма (сетевых) для разработки следует использовать неблокирующий режим NIO.
Сравнение БИО и НИО
Модель ввода-вывода | BIO | NIO |
---|---|---|
коммуникация | ориентированный на поток | ориентированный на буфер |
иметь дело с | блокировка ввода-вывода | неблокирующий ввод-вывод |
курок | никто | Селектор |
Простая модель связи сервера NIO:
Простая модель связи BIO с сервером:
Особенности НИО:
- Поток может обрабатывать несколько каналов, уменьшая количество создаваемых потоков;
- Неблокирующее чтение и запись, экономия ресурсов: когда нет данных, доступных для чтения/записи, не будет блокировки и траты ресурсов потока.
Модель реактора
Однопоточная модель Reactor
Модель многопоточного реактора
Многопоточная модель master-slave Reactor
Основные понятия Netty
Введение в Нетти
Netty — это клиент-серверная среда NIO для быстрой и простой разработки сетевых приложений, таких как протокольные серверы и клиенты. Это значительно упрощает и упрощает сетевое программирование, такое как серверы сокетов TCP и UDP.
«Быстро и просто» не означает, что конечное приложение будет страдать от проблем с ремонтопригодностью или производительностью. Netty был тщательно разработан, чтобы объединить опыт реализации со многими протоколами, такими как FTP, SMTP, HTTP, а также с различными двоичными и текстовыми устаревшими протоколами. В результате Netty удалось найти способ легко и без компромиссов добиться разработки, производительности, стабильности и гибкости.
Процесс выполнения Netty
Основные компоненты Netty
Channel
Канал — это базовая конструкция Java NIO. Может рассматриваться как носитель входящих или исходящих данных. Поэтому его можно включать и выключать, подключать или отключать.
EventLoop и EventLoopGroup
EventLoop определяет основную абстракцию Netty, которая используется для обработки событий, происходящих в жизненном цикле соединения.Внутренне EventLoop будет выделен каждому каналу.
EventLoopGroup — это пул EventLoop, содержащий множество циклов EventLoop.
Netty назначает EventLoop каждому каналу для обработки всех событий, таких как запросы пользователей на подключение и обработка запросов пользователей. EventLoop сам по себе является просто драйвером потока, и только один поток связан в его жизненном цикле, что позволяет этому потоку обрабатывать все события ввода-вывода канала.
После привязки канала к EventLoop его нельзя изменить в течение всего жизненного цикла канала. EventLoop может быть привязан к нескольким каналам. То есть отношение между Channel и EventLoop равно n:1, а отношение между EventLoop и потоком равно 1:1.
Сервер Bootstrap и Bootstrap
Bootstrap и ServerBootstrap называются классами начальной загрузки, которые относятся к процессу настройки приложения и его запуска. Способ, которым Netty обрабатывает начальную загрузку, состоит в том, чтобы изолировать ваше приложение от сетевого уровня.
Bootstrap — это класс начальной загрузки клиента.Когда Bootstrap вызывает методы bind() (подключение UDP) и connect() (подключение TCP), он создает новый канал и создает только отдельный канал без родительского канала для достижения вся сетевая биржа.
ServerBootstrap — это класс начальной загрузки сервера.Когда ServerBootstarp вызывает метод bind(), он создает ServerChannel для приема соединений от клиентов, а ServerChannel управляет несколькими подканалами для связи с клиентами.
ChannelHandler и ChannelPipeline
ChannelHandler — это процессор для данных в канале.Эти процессоры могут быть кодеками, определенными самой системой или определенными пользователем. Эти процессоры будут равномерно добавлены к объекту ChannelPipeline, а затем данные в канале будут обрабатываться в порядке их добавления.
ChannelFuture
Все операции ввода-вывода в Netty являются асинхронными, то есть операция не получит возвращаемый результат немедленно, поэтому Netty определяет объект ChannelFuture как «представителя» этой асинхронной операции, представляющего саму асинхронную операцию. Если вы хотите получить возвращаемое значение асинхронной операции, вы можете использовать метод addListener() объекта асинхронной операции, чтобы добавить прослушиватель Netty, который отслеживает структуру сетевого программирования NIO, для асинхронной операции и зарегистрировать обратный вызов для это: когда результат выходит, вызывайте и немедленно выполняйте его.
Модель асинхронного программирования Netty основана на концепциях Future и обратного вызова.
Чтение исходного кода Netty
Чтение исходного кода лучше всего выполнять в случае Debug, который легче понять, поэтому я готовлю клиентский и серверный код перед анализом Netty.
Нетти — код сервера
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SomeSocketServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("服务器已启动。。。");
future.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
Серверный обработчик:
public class DemoSocketServerHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
ctx.fireChannelActive();
TimeUnit.MILLISECONDS.sleep(500);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Нетти — код клиента
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new DemoSocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
}
Обработчик клиента:
public class DemoSocketClientHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(5000);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
ctx.channel().writeAndFlush("from client:begin talking");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Анализ инициализации NioEventLoopGroup
Сначала проанализируйте процесс инициализации NioEventLoopGroup в соответствии с кодом сервера Server. Прежде чем анализировать NioEventLoopGroup, необходимо кратко рассказать о NioEventLoopGroup и NioEventLoop, чтобы облегчить понимание последующего исходного кода.
Разберитесь, прежде чем анализировать исходный код NioEventLoop
Система наследования NioEventLoop
Как видно из системы наследования NioEventLoop, NioEventLoop сам по себе является Исполнителем, и он также является однопоточным Исполнителем. Исполнитель должен иметьexecute(Runnable command)
метод реализации, а метод NioEventLoopexecute()
Метод реализации находит конкретный код в своем родительском классе SingleThreadEventExecutor:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Я не буду здесь вдаваться в подробности, но я разместил этот код главным образом для того, чтобы выявитьstartThread();
Этот код, следуя этому коду, вы обнаружите, что он, наконец, вызывает Executor члена NioEventLoop для выполнения текущего члена.execute()
метод. член-корреспондентio.netty.util.concurrent.SingleThreadEventExecutor#executor
Инициализация члена исполнителя также является анонимным Исполнителем, созданным при выполнении текущего кода, то есть он создается и выполняется при выполнении текущего кода.executr()
метод.
Суммировать:
- Сам NioEventLoop является Исполнителем.
- NioEventLoop внутренне инкапсулирует этот новый член Executor потока.
- NioEventLoop имеет два
execute
метод, помимо собственногоexecute()
Метод соответствует атрибуту члена Executor, соответствующемуexecute()
метод.
Примечание:Поскольку здесь четыре Исполнителя, мы даем им новые имена, чтобы различать их:
Сам NioEventLoop Исполнитель:NioEventLoop
Участник-исполнитель NioEventLoop:Детский исполнитель
Сама NioEventLoopGroup является Исполнителем:NioEventLoopGroup
Параметр построения Executor группы NioEventLoopGroup:Всего Исполнитель
Система наследования NioEventLoopGroup
Увидев систему наследования, можно напрямую узнать, что NioEventLoopGroup также является Исполнителем и Исполнителем пула потоков, поэтому у него тоже естьexecute()
метод. Соответствующая реализация находится в родительском классе:io.netty.util.concurrent.AbstractEventExecutorGroup#execute
Еще один момент, который необходимо здесь упомянуть, заключается в том, что в конструкции NioEventLoopGroup новый Executor снова вводится в конструкцию его родительского класса MultithreadEventExecutorGroup.
Причина, по которой этот Executor упоминается здесь, заключается в том, что этот Executor соответствуетexecute()
Это член Executor в NioEventLoop.execute()
Вызывается во время выполнения. То есть соответствующий код вызова ниже.io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)
Если вы этого не понимаете, это не имеет значения, потому что это просто введение двух Исполнителей, соответствующих NioEventLoopGroup и NioEventLoop, и двух соответствующих Исполнителей.execute()
метод. Подробный разбор будет позже.
Суммировать:
- NioEventLoopGroup — это исполнитель потока из пула потоков.
- NioEventLoopGroup также инкапсулирует исполнителя потока.
- NioEventLoopGroup также имеет два
execute()
метод.
Анализ кода инициализации NioEventLoopGroup
Основное понимание содержимого описано выше, а конкретный анализ приведен ниже, от инициализации NioEventLoopGroup до анализа исходного кода.
На входе прямо ищем безпараметрическую конструкцию NioEventLoopGroup.
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 第二个参数是这个group所包含的executor
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// 第三个参数是provider,其用于提供selector及selectable的channel,
// 这个provider是当前JVM中唯一的一个单例的provider
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四个参数是一个选择策略工厂实例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 第三个参数是选择器工厂实例
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
После этого можно обнаружить, что базовые параметры безпараметрической конструкции инициализированы,nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前CPU逻辑核心数的两倍
,selectorProvide:SelectorProvider.provider()//当前JVM中唯一的一个单例的provider
,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例
,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例
. Вот только основные параметры инициализации, ключевой методMultithreadEventExecutorGroup
метод построения. При анализе основное внимание уделяется следующему:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 关闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建一个选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
В соответствии с конструкцией без параметров, следуйте ей напрямую, и вы увидите, что основная часть находится в конструкции последнего родительского класса. этоio.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
.
Затем завершите инициализацию экземпляра всей NioEventLoopGroup здесь, проанализируйте ее здесь, а затем нарисуйте картинку для просмотра.
Инициализируйте параметр Executor в параметрах построения, а когда он пуст, инициализируйте его
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
во-первыхnewDefaultThreadFactory())
Создайте фабрику потоков по умолчанию, вы можете следить за ней, если вам это интересно. затем создайтеThreadPerTaskExecutor
Объект исполнителя потока. (PS: Созданный здесь Executor является объектом Executor в NioEventLoopGroup, а не самой текущей NioEventLoopGroup, которую можно вызватьВсего Исполнитель).
Затем вы можете увидеть, что здесь создается дочерний массив, и соответствующее количество массивов создается в соответствии с количеством потоков, которые необходимо создать.
children = new EventExecutor[nThreads];
Поскольку каждая NioEventLoopGroup представляет собой набор NioEventLoop, дочерний массив здесь представляет собой NioEventLoop текущей NioEventLoopGroup. Таким образом, NioEventLoop создается при инициализации NioEventLoopGroup. Давайте посмотрим на инициализацию NioEventLoop:
// 逐个创建nioEventLoop实例
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
Сначала посмотрите на код создания этого NioEventLoop в целом, вы можете увидеть, что во всем процессе есть признак успеха, поймайте завершение каждого процесса создания NioEventLoop, и если произойдет исключение, все созданные NioEventLoop будут быть закрытым. Ключевой код также находится в создании NioEventLoop. Итак, мы продолжаем:children[i] = newChild(executor, args);
спустись и найдиio.netty.channel.nio.NioEventLoopGroup#newChild
, поскольку в настоящее время он является созданием NioEventLoopGroup, он знает, как найти подклассnewChild
выполнить.
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Затем принудительно верните ранее объединенный параметр args и продолжайте следить за построением NioEventLoop:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建一个selector的二元组
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
Здесь давайте посмотрим на это в целом и инициализируем предыдущие параметры по умолчанию в свойстве NioEventLoop. Их два:openSelector()
иsuper(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)
. Вот посмотрите на структуру родительского класса:
Спускаясь вниз, это непосредственно инициализация SingleThreadEventLoop -> SingleThreadEventExecutor, что также можно увидеть в системе наследования NioEventLoop:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 创建一个收尾队列
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 这是当前NioEventLoop所包含的executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 创建一个任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Первое, что здесь создано, это SingleThreadEventExecutor, здесь следует сосредоточиться на коде:
this.executor = ThreadExecutorMap.apply(executor, this);
здесьthis
это NioEventLoop , поэтомуthis.executor
Это Executor в NioEventLoop, упомянутый ранее, здесь мы вызываем его первымДетский исполнитель(Sub: соответствует NioEventLoop, как упоминалось ранее: соответствует NioEventLoopGroup).
и тутДетский исполнительинициализируетсяexecutor
параметр, это то, что всегда вносил предыдущий конструктор NioEventLoopGroupВсего Исполнитель. Тогда давайте продолжим следить и увидим этоДетский исполнительКак происходит инициализация.
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 这里创建的executor是子executor
return new Executor() {
// 这个execute()是子executor的execute()
@Override
public void execute(final Runnable command) {
// 这里调用了NioEventLoopGroup所包含的executor的execute()
// 即调用了“总的executor”的execute()
executor.execute(apply(command, eventExecutor));
}
};
}
Если вы внимательно посмотрите на этот код, то поймете, что он создан здесь.Детский исполнительСоздание потока также является созданием потока, но основное внимание уделяется Исполнителю потока.execute()
Метод реализован и делает только одно: вызывает входящийВсего Исполнительизexecute()
метод. так вотДетский исполнительВсе, что он делает, это звонитВсего Исполнительизexecute()
. Не думайте, что здесь это сделано в обход, потому что это просто инициализация, а выполнение здесь будет позже в обход. [ручно закрывает лицо и плачет]
На самом деле здесьapply(command, eventExecutor)
, выполнить снова здесьВсего Исполнительизexecute()
Текущий выполняемый поток по-прежнему будет записан, а текущее записанное значение будет удалено, когда выполнение будет завершено.
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
Здесь, когда создается свойство Executor NioEventLoop, снова создается общая очередь задачtaskQueue = newTaskQueue(this.maxPendingTasks);
а также создает очередь завершающих задачtailTasks = newTaskQueue(maxPendingTasks);
. Эти очереди будут рассмотрены позже. Продолжайте инициализировать основной процесс NioEventLoop здесь.
Здесь мы возвращаемся и видимopenSelector()
, здесь нам нужно сначала знать SelectorTuple:
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO原生selector
final Selector selector; // 优化过的selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
SelectorTuple — это просто внутренний класс, который содержит два селектора для инкапсуляции селектора до и после оптимизации. иopenSelector()
Метод заключается в возврате селектора и определении необходимости оптимизации текущего селектора в соответствии с конфигурацией. См. конкретный код ниже:
Если вас интересует конкретный процесс оптимизации, вы можете пойти и увидеть его самостоятельно.Просто знайте, что если оптимизация отключена, оптимизированный селектор SelectorTuple и оптимизированный селектор являются родными селекторами Nio.
Этотio.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
Позже, после создания массива NioEventLoop, есть также создание селектора и привязка слушателя закрытия и т. д. Если вам интересно, вы можете посмотреть сами, и я не буду вводить это здесь.
Также был прочитан код процесса создания этого NioEventLoop. Я думаю, если вы просто посмотрите на это, вы, должно быть, немного запутались.Чтобы увидеть его, вам нужно следить за исходным кодом, немного отлаживать и следить за работающим кодом, чтобы понять, почему он реализован, но здесь я также нарисуйте картинку, чтобы сделать ее более понятной для всех.Узнайте о процессе создания и основных операциях NioEventLoopGroup.
Я думаю, что каждый может совместить эту картинку и описанный выше процесс анализа, лучше всего найти исходный код самостоятельно, пройтись по нему еще раз, вы должны понять создание NioEvnetLoopGroup.
Анализ конфигурации свойств ServerBootstrap и ServerBootstrap
Система наследования:
Код входа:
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup, workerGroup)
// (非必备)打印日志
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑
p.addLast(new HelloServerHandler());
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});
И ServerBootstrap, и Bootstrap являются классами конфигурации запуска.Единственная разница в том, что ServerBootstrap — это класс конфигурации запуска сервера, а Bootstrap — класс конфигурации запуска клиента.В основном он используется для привязки созданной нами EventLoopGroup, указываем тип Channel, а привязка Channel processing.Главное — назначить свойства ServerBootstrap и Bootstrap, поэтому он называется конфигурационным классом. может войтиgroup()
Взгляните на метод:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
То же самое и с другими методами, если интересно, можете зайти и посмотреть сами. Это просто инициализация, подготовка к последующим операциям.
Метод привязки на стороне сервера ServerBootstrap.bind() Анализ исходного кода
Здесь мы входим отсюда:
b.bind(port).sync();
Непосредственно изbind()
Далее следует метод:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 继续跟进
public ChannelFuture bind(SocketAddress localAddress) {
// 验证group与channelFactory是否为null
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 这里是一处重点逻辑
return doBind(localAddress);
}
Это показывает подтверждение того, что группа Bootstrap и ChannelFactory успешно связаны. затем следитьdoBind()
метод:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化channel,并将其注册到selector,返回一个异步结果
final ChannelFuture regFuture = initAndRegister();
// 从异步结果中获取channel
final Channel channel = regFuture.channel();
// 若异步操作执行过程中出现了异常,则直接返回异步对象(直接结束)
if (regFuture.cause() != null) {
return regFuture;
}
// 处理异步操作完成的情况(可能是正常结束,或发生异常,或任务取消,这些情况都属于有结果的情况)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 绑定指定的端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 处理异步操作尚未有结果的情况
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为异步操作添加监听
regFuture.addListener(new ChannelFutureListener() {
// 若异步操作具有了结果(即完成),则触发该方法的执行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) { // 异步操作执行过程中出现了问题
promise.setFailure(cause);
} else { // 异步操作正常结果
promise.registered();
// 绑定指定的端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
Прежде всего, давайте сначала разберемся с общей логикой этого метода, а затем изучим конкретную работу каждого шага, нарисуем картинку и сначала поймем, что делает этот метод:
Вы можете комбинировать код на рисунке, чтобы найти весьdobind()
Затем у нас еще есть много ключевых деталей, которые нужно проработать здесь, то есть тег 1 и тег 2, отмеченные на рисунке. Чтобы облегчить отслеживание кода и вернуться позже, вот тег, а затем исходный код тега Tag подробно проанализирован ниже:
Добавить тег 0:
ChannelPromise и ChannelFuture понимают.
Тег 1:
Асинхронно создайте, инициализируйте канал и зарегистрируйте его с помощью селектора.
final ChannelFuture regFuture = initAndRegister();
Тег 2:
Привязать указанный номер порта:
doBind0(regFuture, channel, localAddress, promise);
Добавлен тег 0: ChannelPromise и ChannelFuture.
ChannelPromise — это особый ChannelFuture, который является модифицируемым ChannelFuture. Внутренне предоставляет методы для изменения текущего состояния Future. Метод модификации установки конечного состояния реализован на базе ChannelFuture.
ChannelFuture может запрашивать только результат текущей асинхронной операции и не может изменять Future текущего асинхронного результата. Что вам нужно знать здесь, так это то, что ChannelPromise может изменять состояние текущего асинхронного результата, и прослушиватель будет запущен при изменении состояния. существуетdoBind
Метод в основном используется для обработки операции, при которой асинхронное выполнение не завершилось, и когда в асинхронном результате возникает исключение, присваивать исключение ChannelPromise и возвращать его.
Тег 1: initAndRegister() инициализирует и регистрирует канал
Сначала найдите код:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将channel注册到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
Хорошо? ! На первый взгляд код делает три вещи, но каждую из этих трех вещей нельзя сделать с помощью одной строки кода. Здесь мы анализируем один за другим.Помимо этих трех вещей, остальное — это логика обработки после исключения, поэтому основной процесс — это следующие три строки кода, и продолжайте помечать его для продолжения:
Тег 1.1 Создать канал channel = channelFactory.newChannel();
Тег 1.2 Инициализировать канал init(channel);
Тег 1.3 Зарегистрируйте канал в селекторе ChannelFuture regFuture = config().group().register(channel);
По этим трем пунктам еще необходимо разобрать их один за другим.
Тег 1.1 channelFactory.newChannel() Создать канал
Найдите соответствующий код:io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
// 调用无参构造器创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
Почему я нахожу ReflectiveChannelFactory именно здесь?Необходимо упомянуть, что при анализе классов начальной конфигурации ServerBootstrap и Bootstrap, как установить канал, и следить за тем, чтобы найти код назначения для атрибута channelFactory:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
Вы можете видеть, что здесь новым является класс фабрики ReflectiveChannelFactory, а затем посмотрите на конструкцию ReflectiveChannelFactory:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 将NioServerSocketChannel的无参构造器初始化到constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
Вы можете видеть, что ReflectiveChannelFactory инициализирует свойство конструктора при его создании и присваивает структуру, полученную из класса входящего канала clazz, свойству конструктора фабрики отражения ReflectiveChannelFactory.
И класс канала, который мы передаем на стороне сервера,NioServerSocketChannel.class
, так что вышеconstructor.newInstance();
Соответствующая конструкция NioServerSocketChannel без параметров. Итак, мы продолжаем следить за NioServerSocketChannel:
// NIO中的provider,其用于创建selector与channel。并且是单例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER 静态变量
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
следовать заnewSocket()
:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 创建NIO原生的channel => ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
Он возвращает собственный канал Java NIO и, наконец, оборачивает собственный канал NIO в NioServerSocketChannel и продолжает отслеживатьthis(newSocket(DEFAULT_SELECTOR_PROVIDER))
Найдите конкретный код параметризованной конструкции:
public NioServerSocketChannel(ServerSocketChannel channel) {
// 参数1:父channel
// 参数2:NIO原生channel
// 参数3:指定当前channel所关注的事件为 接受连接
super(null, channel, SelectionKey.OP_ACCEPT);
// 用于对channel进行配置的属性集合
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Здесь нужно сделать две основные вещи: 1. Вызвать структуру родительского класса, 2. Настроить коллекцию атрибутов канала.
Давайте сначала поговорим о новой NioServerSocketChannelConfig() Эта операция предназначена для присвоения значения конфигурации текущего канала для сохранения набора конфигураций атрибутов текущего канала. Что ж, сказав это, давайте продолжим основную линию:super(null, channel, SelectionKey.OP_ACCEPT)
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 这里的this.ch为NIO原生channel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// NIO,非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
Непосредственно найдите структуру родительского класса AbstractNioChannel, что также является первым шагом для вызова структуры родительского класса.super(parent);
Помните, сначала посмотрите, что еще делается помимо вызова конструктора суперкласса:
- Вызовите родительский класс для создания super(parent);
- Скопируйте созданный ранее нативный канал в свойство и сохраните this.ch = ch;
- Назначение свойства события фокуса текущего канала this.readInterestOp = readInterestOp;// SelectionKey.OP_ACCEPT принимает события
- Установите собственный канал NIO на неблокирующий ch.configureBlocking(false);
В конструкции AbstractNioChannel сделано четыре вещи, главное рассказать о том, что она сделала, когда вызвала конструкцию родительского класса и нашла код:
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 为channel生成id,由五部分构成
id = newId();
// 生成一个底层操作对象unsafe
unsafe = newUnsafe();
// 创建与这个channel相绑定的channelPipeline
pipeline = newChannelPipeline();
}
В конструкции AbstractChannel выполняются три основных действия:
- Сгенерировать идентификатор для текущего канала
newId()
, вы можете следить, если вы заинтересованы.- Создает базовый объект операции unsafe, который используется, когда поток ввода-вывода вызывает транспорт и не может быть вызван кодом пользователя.
newUnsafe()
- Создание канала ChannelPipeline, привязанного к этому каналу, также является ключевой операцией, но я не буду здесь вдаваться в подробности, а позже я буду следить за кодом канала ChannelPipeline отдельно.
Таким образом, **Tag 1.1 newChannel() ** в **Tag 1 : initAndRegister() ** завершает создание канала. противTag 1.1 newChannel()Мы также рисуем диаграмму, чтобы систематизировать следующие идеи:
Судя по рисунку, совмещенному с анализом вышеприведенного кода, лучше всего еще раз пройтись по коду, думаю проблем с пониманием этой части не возникнет. До этого момента Канал только создавался.Создание канала тега 1.1конец, продолжениеTag 1.2 init(channel).
Тег 1.2 init(channel) Инициализировать канал
Здесь мы входим из doBind в ServerBootstrap, поэтому находим его прямо здесьio.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
// 获取serverBootstrap中的options属性
final Map<ChannelOption<?>, Object> options = options0();
// 将options属性设置到channel
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 获取serverBootstrap中的attrs属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
// 遍历attrs属性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// 将当前遍历的attr初始化到channel
channel.attr(key).set(e.getValue());
}
}
// 获取channel的pipeline
ChannelPipeline p = channel.pipeline();
// 将serverBootstrap中所有以child开头的属性写入到局部变量,
// 然后将它们初始化到childChannel中
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 将ServerBootstrapAcceptor处理器添加到pipeline
// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,
// 我们通常称其为连接处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Здесь еще много чего нужно сделать.Я также отметил основные операции в комментариях выше.Есть еще некоторые основные операции, которые необходимо выполнить, поэтому я отмечу их в первую очередь.TagТогда продолжайте. Давайте поговорим о назначении свойств options и attrs здесь, по сути, это означает, что наши ServerBootstrap и Bootstrap вызываютdoBind()
прошло раньшеoption()
иattr()
Установленное значение параметра, где атрибут options установлен на атрибут конфигурации канала, а атрибуты установлены непосредственно на канале.
После установки атрибута options и атрибута attrs, а затем получить конвейер текущего канала, а затем получить то, что мы имеем вdoBind()
Ранее установленное значение свойства, методы, начинающиеся с дочернегоchildOption()
иchildAttr()
Значение свойства, которое необходимо установить.
Здесь все значения, связанные с Child, записываются с использованием локальных переменныхcurrentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs
В основном используется для инициализации свойств childChannel,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))
Главное — создать обработчик соединения.
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 将ServerBootstrapAcceptor处理器添加到pipeline
// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,
// 我们通常称其为连接处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
Первое, что я хочу сделать здесь, это: привязать обработчик инициализации ChannelInitializer к конвейеру текущего канала.Поскольку это абстрактный класс, он должен реализовать метод initChannel анонимно. И эти основные операции связаны с операцией инициализации канала в дочерней группе. Здесь я просто хочу рассказать о том, что в основном делает обработчик соединения ServerBootstrapAcceptor, а остальные детали будут подробно описаны в обработчике и пайплайне позже.
**Дополнительно:** здесь две группы EventLoopGroups, поскольку используется сервер ServerBootstrap. На стороне сервера родительская группа используется для получения соединения от клиента. childGroup для обработки последующих операций, а childGroup используется для конкретной обработки операции после подключения и не заботится о задаче подключения канала.На самом деле это логика обработки модели пула потоков Reactor в Netty-Server.
Здесь мы в основном говорим об этом обработчике соединений: ServerBootstrapAcceptor.
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
Конструкция ServerBootstrapAcceptor просто сохраняет параметры свойства Child, настроенные в ServerBootstrap. И здесь всегда говорится, что это обработчик соединения, потому что при отправке клиентского соединения на сервер этот обработчик будет получать соединение от клиента и обрабатывать его. Основным методом обработки является реализация в channelRead:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg为客户端发送来的数据,其为NioSocketChannel,即子channel,childChannel
final Channel child = (Channel) msg;
// 将来自于ServerBootstrap的child开头属性初始化到childChannel中(childHandler、childOptions、childAttrs)
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将childChannel注册到selector 需要注意的是,这里的selector与父channel所注册的selector不是同一个
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
Здесь в основном делается две вещи:
- инициализировать дочерний канал
- Зарегистрируйте успешно подключенный канал от клиента к селектору.
Подканал всегда упоминается здесь, потому что здесь зарегистрированы две группы EventLoopGroups.При обработке на стороне сервера модель сетевого потока использует метод разделения «поток прослушивания на стороне сервера» и «поток ввода-вывода». так вотchannelRead
Метод заключается в том, чтобы привязать текущий подключенный поток ввода-вывода к дочернему каналу и зарегистрировать его с помощью селектора в дочерней группе, когда клиентская сторона запрашивает подключение к серверной стороне. Резьба, модель может относиться к следующему рисунку:
Ну, код **Tag 1.2 initChannel** также был проанализирован здесь.Некоторые части о конвейере, обработчике и селекторе не детализированы, потому что они будут обсуждаться отдельно позже, и здесь нет прямого расширения.
Вот еще картина: когда эти картинки объединяются вместе, текущий процесс анализа подобен делению целого на части и, наконец, объединению их воедино и превращению частей в целое.
В дополнение к методу init(channel) мы также в основном говорим об обработчике соединения ServerBootstrapAcceptor. На самом деле, это в основном комбинированное понимание потоковой модели netty-server и кода.
Эта статья настолько длинная, что превысит ограничение на количество слов для большинства блог-сайтов, поэтому я разделю ее на части.
Я Ао Бин,Чем больше вы знаете, тем больше вы не знаете, спасибо за ваши таланты:как,собиратьиКомментарий, увидимся в следующий раз!
Статья постоянно обновляется, вы можете искать в WeChat "Третий принц Ао Бин"Прочтите это в первый раз, ответьте [материал] Подготовленные мной материалы интервью и шаблоны резюме крупных заводов первой линии, эта статьяGitHub github.com/JavaFamilyОн был включен, и есть полные тестовые сайты для интервью с крупными заводами.Добро пожаловать в Star.