Анализ исходного кода Netty — epollWait и пробуждение EPOLL
предисловие
Предыдущая статьяСмотрим вместеeventfd
иtimerfd
Главное - проложить путь к этому, и эта глава должна объяснитьEpollEventLoop
изrun
в методеselect
процесс, этоselect
Относится к тому, что мы сказали в самой ранней статьеReactor
Поток делает одну из трех вещейselect
.
Часть нашегоeventfd
В основном как пробуждениеepollWait
значит, покаtimerfd
из-за его阻塞直到超时
Эта функция в основном используется для контроля времени ожидания. Может кто спросит, не ругайте меня, я не понимаюEPOLL
,EPOLL
Поставляется с контролем тайм-аута, который можно указать в параметрахtimeout
Да зачем вам помощьtimerfd
Чтобы сделать контроль тайм-аута?
Ответь на этот вопрос заранее,timerfd
поддерживает наносекундный уровень, в то время какepoll_wait
Параметр представляет собой миллисекундный уровень. Так что здесь используйтеtimerfd
для контроля тайм-аута, в то время какepoll_wait
Параметр равен либо 0 (немедленный возврат, если события нет), либо -1 (что означает ожидание вечно).
исходный код
Первый взглядEpollEventLoop
Метод инициализации:
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
// 初始化epoll
this.epollFd = epollFd = Native.newEpollCreate();
// 初始化eventfd
this.eventFd = eventFd = Native.newEventFd();
try {
// 把eventfd交给epoll来监听IO
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
} catch (IOException e) {
}
// 初始化timerfd
this.timerFd = timerFd = Native.newTimerFd();
try {
// 把timerfd也交给epoll来监听IO
Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
}
success = true;
} finally {
}
Здесь мы видим, что epoll будет слушатьtimerfd
иeventfd
Событие IO здесь очень важно.Позже мы увидим эти два файловых дескриптора в базовом системном вызове (код языка C).
Продолжай читатьrun
Метод, или тот, который остается неизменным на протяжении тысячелетий, представляет собой бесконечный цикл, пока мыshutdown
. Первоначально рассчитывает стратегию, она представляет собой начало реализации того, что может бытьSELECT
илиBUSY_SELECT
. Правило блокировать без задачselect
(Обратите внимание, что мы говорим здесь оselect
, но нижний слойepoll_wait
,select
Указывает на ожидание события), если есть, то оно неблокирующееselect
,select
Об обработке события,select
Просто продолжайте идти вниз.
давайте посмотрим на дваselect
Способ:
case SelectStrategy.BUSY_WAIT:
strategy = epollBusyWait();
break;
case SelectStrategy.SELECT:
strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
Первый называетсяepollBusyWait
, второе называетсяepollWait
обратите внимание, что нижний слой - два разных вызова, давайте сначала посмотрим на этоepollWait
,так какepollBusyWait
Просто вращение оптимизировано низкоуровневыми инструкциями.
epollWait
Способ преследовать этоNative.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos)
:
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
// 如果返回值小于0就直接抛异常
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
позвонить напрямуюnative
метод, который в конечном итоге будет выполнен в файле языка Cnetty_epoll_native_epollWait0
функция. Глобальный поиск можно найти, взгляните на реализацию:
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
if (tvSec == 0 && tvNsec == 0) {
// 这里是一个非阻塞的epoll分支,表示select一下然后立即返回
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
// 如果失败,会返回-1, errno将会被设置
// 这里EINTR表示我们的调用被信号打断
} while((err = errno) == EINTR);
} else {
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
// 设置我们的超时,这里第二个参数0表示从当前开始计算,ts.sec秒后超时
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(...);
return -1;
}
}
do {
// 一直等待,直到timerfd超时
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
if (result == 1 && ev[0].data.fd == timerFd) {
uint64_t timerFireCount;
// 我们是ET模式,所以要把里面的值读取走,这样新的数据进来的时候我们才可以得到通知。具体ET的工作方式请查看我之前的文章
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
}
return -err;
Здесь снова с двумя ветвями первая ветвь относительно проста, немедленно возвращается (epoll_wait
Последний параметр равен 0). Мы в основном смотрим на способ блокировки, здесь мы используемtimerfd
как контроль тайм-аута.
я здесьEpollEventLoop
Когда он был инициализирован, мы сказали это, в дополнение к инициализацииepoll
В дополнение к дескриптору файла также инициализируются два дескриптора файла, а именноeventfd
иtimerfd
, и оба файловых дескриптора передаютсяepoll
мониторировать IO. Давайте посмотрим первымtimerfd
как контролировать время ожидания.
Установите время ожидания, мы сделали это один разresult = epoll_wait(efd, ev, len, -1);
. Здесь мы разделим его на две основные главы, чтобы рассмотреть логику этой части.
Часть исходного кода языка C
Отсюда во всем контролеepoll
ядро.
select
Событие разделено на несколько ситуаций:
-
select
В случае, всеsocket
ИО.result > 0
, и когдаresult = 1
когдаresult == 1 && ev[0].data.fd == timerFd
то, что возвращаетсяfalse
. потому что всеsocket
ИО, такev
не существует вtimerfd
. -
select
В случае, толькоtimerfd
IO, мы только что сказали,timerfd
Перейти к тайм-ауту, напишите номер времени ожиданияtimerfd
файл, поэтому этот сценарий на самом деле является тайм-аутом.result == 1 && ev[0].data.fd == timerFd
вернуться сейчасtrue
, потому что толькоtimerfd
Данные записываются в него с течением времени, и этоepoll
Если он контролируется, то возвращаемый fd, конечно,timerfd
. -
select
Среди происшествий естьsocket
ИО, также естьtimerfd
события ИО.result == 1 && ev[0].data.fd == timerFd
вернуться сейчасfalse
. так какresult
По крайней мере 2.
Первый это всеsocket
событие, даже еслиresult
1 (только одинsocket
событие),ev[0].data.fd == timerFd
также вернулсяfalse
, поэтому, согласно приведенному выше коду, непосредственноreturn
.
Во-вторых, толькоtimerfd
Событие, то есть тайм-аут, в это время входит в следующий блок кода:
uint64_t timerFireCount;
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
Поскольку у нас истекло время ожидания,read
вернется немедленно. Обратите внимание, что здесь мы должныtimerfd
Прочитайте содержание в , потому что мы находимся в режиме ET, чтобы мы могли получить следующее уведомление.Для студентов, которые забыли метод работы ET, пожалуйста, прочитайте мои предыдущие статьи о методе работы ET и LT.
Здесь мы действительно знаемtimerfd
как контролироватьepoll
истекло время ожидания. мы используемepoll
мониторtimerfd
А потом кtimerfd
Установите тайм-аут, этот тайм-аут на самом деле то, что мы хотимepoll
блокироватьselect
время блокировки. Когда придет время, даже еслиepoll
нетselect
к другомуsocket
события IO, по крайней мере, такжеselect
прибытьtimerfd
Событие IO, то есть сказать:result = epoll_wait(efd, ev, len, -1);
будет блокироваться не более чем на время тайм-аута, а затем просыпаться (и возвращатьсяtimerfd
дескриптор)!
Здесь Нетти ловко используетtimerfd
Функция тайм-аута при записи сepoll
мониторtimerfd
来时间超时控制。 зачем использоватьtimerfd
Как упоминалось выше, посколькуtimerfd
можно контролировать вплоть до наносекундного уровня, в то время какepoll_wait
Звонки можно контролировать только до миллисекундного уровня.
В третьем случае обаsocket
событие, смешанное сtimerfd
событие. Здесь то же самое, что и первое, оно вернется напрямую.
Подожди, мы не просто говорили, что нам нужно поставитьtimerfd
Событие в чтении и иди, если не прочитаешь, новых уведомлений в дальнейшем не будешь получать, то возвращайся сразу сюда, в следующий разtimerfd
Даже если это ИО,epoll
Что делать, если я не могу контролировать? Если вы можете подумать над этим вопросом, значит, вы хорошо подумали, и ответ на этот вопрос будет найден в исходном коде java-части.
часть исходного кода java
Давайте взглянемprocessReady
метод, мы успешно возвращаем результат и он не 0, мы войдем сюда, как мы сказали выше, третий случай может бытьsocket
IO события иtimerfd
Событие смешивается вместе, посмотрите, как иметь дело с:
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
// 这里我们后面要说道eventfd的作用时再解释
Native.eventFdRead(fd);
} else if (fd == timerFd.intValue()) {
// 如果socket和timerfd的IO混在一起,我们通过Reactor线程调用一次read
// 这样我们以后还可以在ET下收到timerfd的同志
Native.timerFdRead(fd);
} else {
final long ev = events.events(i);
AbstractEpollChannel ch = channels.get(fd);
if (ch != null) {
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
// 写准备就绪
unsafe.epollOutReady();
}
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
// 读准备就绪
unsafe.epollInReady();
}
if ((ev & Native.EPOLLRDHUP) != 0) {
// 对端关闭
unsafe.epollRdHupReady();
}
} else {
try {
// 如果channel是null,我们就不再需要关心这个channel的事件了
// 这时候我们把这个channel对应的fd从epoll中移除
Native.epollCtlDel(epollFd.intValue(), fd);
} catch (IOException ignore) {
}
}
}
}
Чтобы разгадать загадку, в третьем случае выше, нам также нужноtimerfd
Данные в файле считываются, чтобы мы могли получить их в следующий раз.timerfd
ИО. Но из-за инцидентов, которые мы получили, смешанныеtimerfd
иsocket
IO событий, так что мы вReactor
в темеtimerfd
Операция чтения. Позвольте мне говорить об этом, это тоже слово: как быtimerfd
Данные, записанные во время тайм-аута, считываются!
Мы также видим это в коде:
if (fd == eventFd.intValue()) {
Native.eventFdRead(fd);
}
Это включает в себя процесс пробуждения, о котором мы поговорим в следующем разделе.
Следующим шагом будет обработкаsocket
события ввода-вывода, здесь мы видим, что мы обрабатываем события записи и чтения, в дополнение кEPOLLRDHUP
этот инцидент. Это событие будет запущено, когда одноранговый конец нормально отключится (оно такжеEPOLLIN
). этоEPOLLRDHUP
Я так понимаю, что это засчитывается как половина события чтения.В Netty об этом мы тоже поговорим позже.При обработке этого события, еслиchannel
даactive
, будет обрабатывать чтение.
Мы поговорим об обработке событий позже, давайте посмотрим здесьeventfd
эффект.
С EventFD контролировать след EPOLL
eventfd
Он также передается в момент инициализацииepoll
контролировать. Мы еще говорили об использовании, то есть одна сторона пишет, а другая сторона может долго читать. Многие студенты здесь, возможно, догадались, прочитав вышеизложенное: когда мы говорим о чтении и письме, мы немедленно реагируем, а чтение и письмо игнорируются.epoll
Подслушано, и потому что нашиeventfd
находитсяepoll
монитор, то если мы отправимeventfd
Запишите данные, иначе это может быть прерваноepoll_wait
Тем не менее, потому чтоepoll_wait
По крайней мере, возвратeventfd
!
Мы угадали этот шаг, давайте посмотрим непосредственноwakeup
метод, так как мы былиReactor
Механизм говорит, что если приходит задача, нам нужно проснуться и заблокироватьselect
, цель состоит в том, чтобы предотвратить блокировку нашей новой задачи без возможности выполнения,wakeup
:
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
Native.eventFdWrite(eventFd.intValue(), 1L);
}
Действительноeventfd
Запишите данные, чтобы проснутьсяepoll_wait
. Тогда возникает проблема после написания, мы всегда должны помнить, что Неттиepoll
По умолчанию ET.После того, как мы записываем данные, чтобы получить их в следующий разeventfd
Уведомление IO, старые данные должны быть считаны.На данный момент мы объединяем проблемы, которые необходимо решить в предыдущем разделе:
if (fd == eventFd.intValue()) {
Native.eventFdRead(fd);
}
Излишне говорить, давайте поговоримtimerfd
Та же процедура данные читаются для получения новых данных.
Суммировать
Неттиepoll
иnio
Все полагаются на модель Reactor, конечноkqueue
То же самое, навсегдаReactor
Три задачи потока, включая логику пробуждения, выполняются толькоepoll
основан наeventfd
иtimerfd
,иnio
черезselector
изwakeup
.
В общем, роль двух базовых fds:
-
eventfd
: Чтобы иметь возможность разбудить блокировку напрямуюselect
. -
timerfd
: Для возможности периодически будить блокировкуselect
.
пока что весьepoll
изselect
Это конец.