Анализ исходного кода Netty — epollWait и пробуждение EPOLL

Java задняя часть исходный код Netty
Анализ исходного кода Netty — epollWait и пробуждение EPOLL

Анализ исходного кода Netty — epollWait и пробуждение EPOLL

предисловие

Предыдущая статьяСмотрим вместеeventfdиtimerfdГлавное - проложить путь к этому, и эта глава должна объяснитьEpollEventLoopизrunв методеselectпроцесс, этоselectОтносится к тому, что мы сказали в самой ранней статьеReactorПоток делает одну из трех вещейselect.

Часть нашегоeventfdВ основном как пробуждениеepollWaitзначит, покаtimerfdиз-за его阻塞直到超时Эта функция в основном используется для контроля времени ожидания. Может кто спросит, не ругайте меня, я не понимаюEPOLL,EPOLLПоставляется с контролем тайм-аута, который можно указать в параметрахtimeoutДа зачем вам помощьtimerfdЧтобы сделать контроль тайм-аута?

Ответь на этот вопрос заранее,timerfdподдерживает наносекундный уровень, в то время какepoll_waitПараметр представляет собой миллисекундный уровень. Так что здесь используйтеtimerfdдля контроля тайм-аута, в то время какepoll_waitПараметр равен либо 0 (немедленный возврат, если события нет), либо -1 (что означает ожидание вечно).

исходный код

Первый взглядEpollEventLoopМетод инициализации:

FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
    // 初始化epoll
    this.epollFd = epollFd = Native.newEpollCreate();
    // 初始化eventfd
    this.eventFd = eventFd = Native.newEventFd();
    try {
        // 把eventfd交给epoll来监听IO
        Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
    } catch (IOException e) {
    }
    // 初始化timerfd
    this.timerFd = timerFd = Native.newTimerFd();
    try {
        // 把timerfd也交给epoll来监听IO
        Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
    } catch (IOException e) {
    }
    success = true;
} finally {
}

Здесь мы видим, что epoll будет слушатьtimerfdиeventfdСобытие IO здесь очень важно.Позже мы увидим эти два файловых дескриптора в базовом системном вызове (код языка C).

Продолжай читатьrunМетод, или тот, который остается неизменным на протяжении тысячелетий, представляет собой бесконечный цикл, пока мыshutdown. Первоначально рассчитывает стратегию, она представляет собой начало реализации того, что может бытьSELECTилиBUSY_SELECT. Правило блокировать без задачselect(Обратите внимание, что мы говорим здесь оselect, но нижний слойepoll_wait,selectУказывает на ожидание события), если есть, то оно неблокирующееselect,selectОб обработке события,selectПросто продолжайте идти вниз.

давайте посмотрим на дваselectСпособ:

case SelectStrategy.BUSY_WAIT:
    strategy = epollBusyWait();
    break;

case SelectStrategy.SELECT:
    strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);

Первый называетсяepollBusyWait, второе называетсяepollWaitобратите внимание, что нижний слой - два разных вызова, давайте сначала посмотрим на этоepollWait,так какepollBusyWaitПросто вращение оптимизировано низкоуровневыми инструкциями.

epollWaitСпособ преследовать этоNative.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos):

int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
                       timeoutSec, timeoutNs);
// 如果返回值小于0就直接抛异常
if (ready < 0) {
    throw newIOException("epoll_wait", ready);
}
return ready;

позвонить напрямуюnativeметод, который в конечном итоге будет выполнен в файле языка Cnetty_epoll_native_epollWait0функция. Глобальный поиск можно найти, взгляните на реализацию:

struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;

if (tvSec == 0 && tvNsec == 0) {
    // 这里是一个非阻塞的epoll分支,表示select一下然后立即返回
    do {
        result = epoll_wait(efd, ev, len, 0);
        if (result >= 0) {
            return result;
        }
    // 如果失败,会返回-1, errno将会被设置
    // 这里EINTR表示我们的调用被信号打断
    } while((err = errno) == EINTR);
} else {
    if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
        struct itimerspec ts;
        memset(&ts.it_interval, 0, sizeof(struct timespec));
        ts.it_value.tv_sec = tvSec;
        ts.it_value.tv_nsec = tvNsec;
        // 设置我们的超时,这里第二个参数0表示从当前开始计算,ts.sec秒后超时
        if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
            netty_unix_errors_throwChannelExceptionErrorNo(...);
            return -1;
        }
    }
    do {
        // 一直等待,直到timerfd超时
        result = epoll_wait(efd, ev, len, -1);
        if (result > 0) {
            if (result == 1 && ev[0].data.fd == timerFd) {
                uint64_t timerFireCount;
                // 我们是ET模式,所以要把里面的值读取走,这样新的数据进来的时候我们才可以得到通知。具体ET的工作方式请查看我之前的文章
                result = read(timerFd, &timerFireCount, sizeof(uint64_t));
                return 0;
            }
            return result;
        }
    } while((err = errno) == EINTR);
}
return -err;

Здесь снова с двумя ветвями первая ветвь относительно проста, немедленно возвращается (epoll_waitПоследний параметр равен 0). Мы в основном смотрим на способ блокировки, здесь мы используемtimerfdкак контроль тайм-аута.

я здесьEpollEventLoopКогда он был инициализирован, мы сказали это, в дополнение к инициализацииepollВ дополнение к дескриптору файла также инициализируются два дескриптора файла, а именноeventfdиtimerfd, и оба файловых дескриптора передаютсяepollмониторировать IO. Давайте посмотрим первымtimerfdкак контролировать время ожидания.

Установите время ожидания, мы сделали это один разresult = epoll_wait(efd, ev, len, -1);. Здесь мы разделим его на две основные главы, чтобы рассмотреть логику этой части.

Часть исходного кода языка C

Отсюда во всем контролеepollядро.

selectСобытие разделено на несколько ситуаций:

  1. selectВ случае, всеsocketИО.result > 0, и когдаresult = 1когдаresult == 1 && ev[0].data.fd == timerFdто, что возвращаетсяfalse. потому что всеsocketИО, такevне существует вtimerfd.
  2. selectВ случае, толькоtimerfdIO, мы только что сказали,timerfdПерейти к тайм-ауту, напишите номер времени ожиданияtimerfdфайл, поэтому этот сценарий на самом деле является тайм-аутом.result == 1 && ev[0].data.fd == timerFdвернуться сейчасtrue, потому что толькоtimerfdДанные записываются в него с течением времени, и этоepollЕсли он контролируется, то возвращаемый fd, конечно,timerfd.
  3. selectСреди происшествий естьsocketИО, также естьtimerfdсобытия ИО.result == 1 && ev[0].data.fd == timerFdвернуться сейчасfalse. так какresultПо крайней мере 2.

Первый это всеsocketсобытие, даже еслиresult1 (только одинsocketсобытие),ev[0].data.fd == timerFdтакже вернулсяfalse, поэтому, согласно приведенному выше коду, непосредственноreturn.

Во-вторых, толькоtimerfdСобытие, то есть тайм-аут, в это время входит в следующий блок кода:

uint64_t timerFireCount;
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;

Поскольку у нас истекло время ожидания,readвернется немедленно. Обратите внимание, что здесь мы должныtimerfdПрочитайте содержание в , потому что мы находимся в режиме ET, чтобы мы могли получить следующее уведомление.Для студентов, которые забыли метод работы ET, пожалуйста, прочитайте мои предыдущие статьи о методе работы ET и LT.

Здесь мы действительно знаемtimerfdкак контролироватьepollистекло время ожидания. мы используемepollмониторtimerfdА потом кtimerfdУстановите тайм-аут, этот тайм-аут на самом деле то, что мы хотимepollблокироватьselectвремя блокировки. Когда придет время, даже еслиepollнетselectк другомуsocketсобытия IO, по крайней мере, такжеselectприбытьtimerfdСобытие IO, то есть сказать:result = epoll_wait(efd, ev, len, -1);будет блокироваться не более чем на время тайм-аута, а затем просыпаться (и возвращатьсяtimerfdдескриптор)!

Здесь Нетти ловко используетtimerfdФункция тайм-аута при записи сepollмониторtimerfd来时间超时控制。 зачем использоватьtimerfdКак упоминалось выше, посколькуtimerfdможно контролировать вплоть до наносекундного уровня, в то время какepoll_waitЗвонки можно контролировать только до миллисекундного уровня.

В третьем случае обаsocketсобытие, смешанное сtimerfdсобытие. Здесь то же самое, что и первое, оно вернется напрямую.

Подожди, мы не просто говорили, что нам нужно поставитьtimerfdСобытие в чтении и иди, если не прочитаешь, новых уведомлений в дальнейшем не будешь получать, то возвращайся сразу сюда, в следующий разtimerfdДаже если это ИО,epollЧто делать, если я не могу контролировать? Если вы можете подумать над этим вопросом, значит, вы хорошо подумали, и ответ на этот вопрос будет найден в исходном коде java-части.

часть исходного кода java

Давайте взглянемprocessReadyметод, мы успешно возвращаем результат и он не 0, мы войдем сюда, как мы сказали выше, третий случай может бытьsocketIO события иtimerfdСобытие смешивается вместе, посмотрите, как иметь дело с:

for (int i = 0; i < ready; i ++) {
    final int fd = events.fd(i);
    if (fd == eventFd.intValue()) {
        // 这里我们后面要说道eventfd的作用时再解释
        Native.eventFdRead(fd);
    } else if (fd == timerFd.intValue()) {
        // 如果socket和timerfd的IO混在一起,我们通过Reactor线程调用一次read
        // 这样我们以后还可以在ET下收到timerfd的同志
        Native.timerFdRead(fd);
    } else {
        final long ev = events.events(i);
        AbstractEpollChannel ch = channels.get(fd);
        if (ch != null) {
            AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
            if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
                // 写准备就绪
                unsafe.epollOutReady();
            }
            if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
                // 读准备就绪
                unsafe.epollInReady();
            }
            if ((ev & Native.EPOLLRDHUP) != 0) {
                // 对端关闭
                unsafe.epollRdHupReady();
            }
        } else {
            try {
                // 如果channel是null,我们就不再需要关心这个channel的事件了
                // 这时候我们把这个channel对应的fd从epoll中移除
                Native.epollCtlDel(epollFd.intValue(), fd);
            } catch (IOException ignore) {
            }
        }
    }
}

Чтобы разгадать загадку, в третьем случае выше, нам также нужноtimerfdДанные в файле считываются, чтобы мы могли получить их в следующий раз.timerfdИО. Но из-за инцидентов, которые мы получили, смешанныеtimerfdиsocketIO событий, так что мы вReactorв темеtimerfdОперация чтения. Позвольте мне говорить об этом, это тоже слово: как быtimerfdДанные, записанные во время тайм-аута, считываются!

Мы также видим это в коде:

if (fd == eventFd.intValue()) {
    Native.eventFdRead(fd);
}

Это включает в себя процесс пробуждения, о котором мы поговорим в следующем разделе.

Следующим шагом будет обработкаsocketсобытия ввода-вывода, здесь мы видим, что мы обрабатываем события записи и чтения, в дополнение кEPOLLRDHUPэтот инцидент. Это событие будет запущено, когда одноранговый конец нормально отключится (оно такжеEPOLLIN). этоEPOLLRDHUPЯ так понимаю, что это засчитывается как половина события чтения.В Netty об этом мы тоже поговорим позже.При обработке этого события, еслиchannelдаactive, будет обрабатывать чтение.

Мы поговорим об обработке событий позже, давайте посмотрим здесьeventfdэффект.

С EventFD контролировать след EPOLL

eventfdОн также передается в момент инициализацииepollконтролировать. Мы еще говорили об использовании, то есть одна сторона пишет, а другая сторона может долго читать. Многие студенты здесь, возможно, догадались, прочитав вышеизложенное: когда мы говорим о чтении и письме, мы немедленно реагируем, а чтение и письмо игнорируются.epollПодслушано, и потому что нашиeventfdнаходитсяepollмонитор, то если мы отправимeventfdЗапишите данные, иначе это может быть прерваноepoll_waitТем не менее, потому чтоepoll_waitПо крайней мере, возвратeventfd!

Мы угадали этот шаг, давайте посмотрим непосредственноwakeupметод, так как мы былиReactorМеханизм говорит, что если приходит задача, нам нужно проснуться и заблокироватьselect, цель состоит в том, чтобы предотвратить блокировку нашей новой задачи без возможности выполнения,wakeup:

if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
    Native.eventFdWrite(eventFd.intValue(), 1L);
}

ДействительноeventfdЗапишите данные, чтобы проснутьсяepoll_wait. Тогда возникает проблема после написания, мы всегда должны помнить, что НеттиepollПо умолчанию ET.После того, как мы записываем данные, чтобы получить их в следующий разeventfdУведомление IO, старые данные должны быть считаны.На данный момент мы объединяем проблемы, которые необходимо решить в предыдущем разделе:

if (fd == eventFd.intValue()) {
    Native.eventFdRead(fd);
}

Излишне говорить, давайте поговоримtimerfdТа же процедура данные читаются для получения новых данных.

Суммировать

НеттиepollиnioВсе полагаются на модель Reactor, конечноkqueueТо же самое, навсегдаReactorТри задачи потока, включая логику пробуждения, выполняются толькоepollоснован наeventfdиtimerfdnioчерезselectorизwakeup.

В общем, роль двух базовых fds:

  • eventfd: Чтобы иметь возможность разбудить блокировку напрямуюselect.
  • timerfd: Для возможности периодически будить блокировкуselect.

пока что весьepollизselectЭто конец.