Анализ исходного кода NioEventLoopGroup
1. Были сделаны некоторые комментарии при чтении исходного кода, а также были проведены некоторые тесты для анализа процесса выполнения в исходном коде из-за ограниченного места в блоге. Чтобы облегчить IDE просмотр, трассировку и отладку кода, поэтому вgithubИсходный код, подробные комментарии и тестовые примеры netty представлены на сайте. Добро пожаловать в звезду и форк!
2. Из-за ограниченного личного уровня анализ и понимание исходного кода могут быть предвзятыми или неполными, пожалуйста, укажите в области комментариев, спасибо!
С сегодняшнего дня я готов войти в netty.Основная идея – посмотреть на некоторые наиболее важные реализации в netty4, то есть на вещи, которые часто могут появляться перед нами. В основном: пулы потоков, каналы, конвейеры, кодеки и часто используемые классы инструментов.
Тогда просмотр исходного кода теперь не должен быть таким подробным, как предыдущий jdk, в основном потому, что после просмотра класса я обнаружил, что netty слишком силен для инкапсуляции кода, а базовая функция может быть инкапсулирована семью или восемью классами для реализации, много абстрактных классов, но в этих абстрактных классах много функций. Следовательно, это в основном зависит от этого процесса, и лучший код или новые идеи, написанные в нем, будут внимательно рассмотрены. Для конкретных подполей каждый метод не может быть таким подробным.
Итак, война исходного кода netty официально началась!
1. Основная идея
Давайте сначала поговорим о заключении, то есть поговорим об идее разобраться в исходном коде этого класса, в основном потому, что эти классы слишком сложны, а функция полностью реализована только в нескольких классах.
Когда мы создаем новый рабочий/боссовый поток, мы обычно используем непосредственно используемый метод построения без параметров, но метод построения без параметров создает пул потоков, в два раза превышающий размер ядра нашего ЦП. Затем вам нужно поместить так много новых потоков в пул потоков. Структура данных, используемая в пуле потоков, здесь хранится в массиве. Каждому потоку необходимо установить очередь задач. Очевидно, что очередь задач использует блокирующую очередь.LinkedBlockQueue
, а потом вспомнить, есть ли более важный параметр в пуле потоков в jdk — это фабрика потоков, да! Здесь также есть эта вещь, она требует от нас передачи вручную, но если она не будет передана, будет использоваться фабрика потоков по умолчанию, которая имеетnewThread
метод, реализация этого метода в основном такая же, как реализация в jdk, которая заключается в создании потока без демона с уровнем 5. Да, это все, что мы делаем, когда создаем пул потоков!
Ну, давайте конкретнее, то, что мы создаем каждый раз,NioEventLoopGroup
Но он унаследовал n классов для реализации пула потоков, то есть предком пула потоков являетсяScheduledExecutorService
Это интерфейс пула потоков в jdk, а самой важной структурой данных в нем является дочерний массив, который используется для установки потоков.
Затем конкретный поток также инкапсулируется, что мы часто видимNioEventLoop
. В этом классе есть две важные структуры: taskQueue и thread. Очевидно, это очень похоже на пул потоков в jdk.
2. Анализ пула потоков NioEventLoopGroup
Сначала создайте пул потоков, количество входящих потоков равно 0, он всегда вызываетthis()
вернуться к последнемуsuper(nThreads,threadFactory,selectorProvider)
то есть использоватьMultithreadEventLoopGroup
Конструктор на этом шаге определяет, что количество потоков должно быть установлено равным удвоенному количеству ядер ЦП, когда количество переданных потоков равно 0. Затем снова звонюMultithreadEventExecutorGroup
Метод построения, вот реальный старт инициализации пула потоков.
Сначала настройте фабрику пула потоков, затем инициализируйте Chooser , затем создайте n потоков и поместите их в дочерний массив и, наконец, установите событие прослушивателя для прерывания потока.
/**
* 这个方法流程:
* 1、设置了默认的线程工厂
* 2、初始化 chooser
* 3、创建nTreads个NioEventLoop对象保存在children数组中
* 4、添加中断的监听事件
* @param nThreads
* @param threadFactory
* @param args
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 默认使用线程工厂是 DefaultThreadFactory
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
// 二的平方的实现是看 n&-n==n
//根据线程个数是否为2的幂次方,采用不同策略初始化chooser
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
//产生nTreads个NioEventLoop对象保存在children数组中
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 没成功,把已有的线程优雅关闭
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 没有完全关闭的线程让它一直等待
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 对每一个 children 添加中断线程时候的监听事件,就是将 terminatedChildren 自增
// 判断是否到达线程总数,是则更新 terminationFuture
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
Существует ветвь if, используемая для инициализации селектора, этот селектор используется для выбора того, какой поток использовать для выполнения каких операций. Вот метод определения того, является ли число степенью двойки.isPowerOfTwo()
Реализация интереснее, и она выложена.
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
Далее наши взоры обратятся кnewChild(threadFactory, args)
, потому что этот метод является абстрактным в этом классе, вNioEventLoopGroup
достигнуто. На самом деле, это очень просто и грубо, когда я это вижу, и я прямо новый.NioEventLoop
, следующим шагом будет анализ класса-оболочки этого потока.
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
// 这里才是重点 也就是真正的线程 被放在自己的 children 数组中
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
3. Анализ потока NioEventLoop
Как видно выше,newChild
Метод заключается в том, чтобы создать новыйNioEventLoop
. Поэтому необходимо хорошенько взглянуть на этот класс-оболочку потока.
Конструктор этого класса должен вызывать родительский классSingleThreadEventLoop
структуры, а затем продолжают увеличиватьсяSingleThreadEventExecutor
Построение, в этом классе реально реализовано построение нити. Там есть две вещи:
-
Создается новый поток, и новому потоку также назначается задача, содержание которой состоит в том, чтобы вызвать метод запуска в этом классе.
NioEventLoop
реализовано в. -
Установите очередь задач на
LinkedBlockQueue
/**
* 构造方法主要完成了:
* 1、new 一个新的线程执行一个 run 方法
* 2、用 LinkedBlockQueue 初始化 taskQueue
* @param parent
* @param threadFactory
* @param addTaskWakesUp
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
// new 了一个新的线程
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
// 调用一个 run 方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 让线程关闭
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn("An event executor terminated with non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
// 使用 LinkedBlockQueue 初始化 taskQueue
taskQueue = newTaskQueue();
}
Затем посмотрите на метод запуска, который он хочет выполнить вNioEventLoop
было реализовано.
/**
*'wakenUp.compareAndSet(false, true)' 一般都会在 select.wakeUp() 之前执行
* 因为这样可以减少 select.wakeUp() 调用的次数,select.wakeUp() 调用是一个代价
* 很高的操作
* 注意:如果说我们过早的把 wakenUp 设置为 true,可能导致线程的竞争问题,过早设置的情形如下:
1) Selector is waken up between 'wakenUp.set(false)' and
'selector.select(...)'. (BAD)
2) Selector is waken up between 'selector.select(...)' and
'if (wakenUp.get()) { ... }'. (OK)
在第一种情况中 wakenUp 被设置为 true 则 select 会立刻被唤醒直到 wakenUp 再次被设置为 false
但是wakenUp.compareAndSet(false, true)会失败,并且导致所有希望唤醒他的线程都会失败导致
select 进行不必要的休眠
为了解决这个问题我们是在 wakenUp 为 true 的时候再次对 select 进行唤醒。
*/
@Override
protected void run() {
for (;;) {
// 获取之前的线程状态,并让 select 阻塞
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
// 有任务在线程创建之后直接开始 select
if (hasTasks()) {
selectNow(); //直接调用了 select 的 selectNow 然后再次唤醒同下面的代码
// 没有任务
} else {
// 自旋进行等待可进行 select 操作
select(oldWakenUp);
// 再次唤醒,解决并发问题
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// 都是处理 selected 的通道的数据,并执行所有的任务,只是在 runAllTasks 传的参数不同
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
Далее следует проанализировать метод запуска, то есть ряд операций, выполняемых потоком после его создания. В основном он делает три вещи:
- сделать выбор
- обрабатывать selectedKeys
- разбудить все задачи в очереди
Описанные выше операции всегда выполняются в цикле, поэтомуNioEventLoop
Роль этого потока только одна: выполнять обработку задачи. Когда поток новый, мы назначаем ему задачу потока, которая заключается в непрерывном выполнении вышеуказанных операций.
Приведенный выше процесс говорит о том, что существует проблема безопасности потоков, то есть, если мы установим wakeUp в true слишком рано, наш выбор проснется, а другие потоки потерпят неудачу, когда они не знают этого состояния и захотят установить wakeUp. заставляя select спать. Основное ощущение, что эта штука не видна между потоками, если использовать volatile, то эта проблема может быть решена, но wakenUp является окончательным и не может быть модифицирован ключевым словом volatile. Таким образом, решение, которое принимает автор, состоит в том, чтобы снова вручную проснуться, чтобы предотвратить ненужный сон, вызванный тем, что другие потоки одновременно устанавливают значение wakenUp.
Затем я хочу поговорить о методе select.Вызов этого метода в основном связан с тем, что в очереди нет задач, поэтому в select пока нет необходимости.Что делает этот метод, так это крутится, чтобы выбрать, и ждет некоторое время, прежде чем перейти к выбору, если нет задачи.
/**
* 这个方法主要干的事情:
* 1、如果不需要等待就直接 select
* 2、需要等待则等一个超时时间再去 select
* 这个过程是不停进行的也就是死循环直达有任务可进行 select 时 select 完毕退出循环
* @param oldWakenUp
* @throws IOException
*/
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 不用等待进行一次 select 操作
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 等一个超时再去选择
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
а потомprocessSelectedKeys();
а такжеrunAllTasks();
Из этих двух методов первый подобен шагам, когда мы писали Nio, проходя обработку selectedKeys, а затемrunAllTasks()
Метод run, выполняющий все задачи.
protected boolean runAllTasks() {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
// 这个循环就是用来循环任务队列中的所有任务
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
task = pollTask(); // 循环条件
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
}
}
4. Резюме
Ну, на самом деле, анализ пула нитей здесь почти такой же. Мы не рассмотрели внимательно многие детали. Мы знаем, что процесс и структура внутри мононити в основном одинаковы.
существуетNioEventLoopGroup
упаковано вNioEventLoop
задачи треда. Специально упаковывается в дочерний массив, а затем использует фабрику newThread для создания потоков, а затем назначает задачи потокам.Задачей является выполнение операции выбора.