Одна либувь - этого мало

Node.js внешний интерфейс
Одна либувь - этого мало

Источник изображения:libuv

Автор этой статьи:Сяо Сиюань

Изучение Node.js все равно не обойдет Libuv. В этой статье мы решили изучить внутреннюю часть Libuv в соответствии с ее реализацией для Linux.

Почему Linux

As an asynchronous event-driven JavaScript runtime, Node.js is designed to build scalable network applications

About Node.js

Node.js — мощный инструмент, с помощью которого студенты, изучающие передовые технологии, изучают бизнес на стороне сервера, предназначен для создания масштабируемого сетевого приложения. Текущая серверная среда в основном Linux.Что касается другой основной серверной среды, Unix, она имеет большое сходство с Linux в API.Поэтому выбор Linux в качестве отправной точки может удвоить урожай и удвоить счастье.

Либув и линукс

Ниже представлена ​​схема архитектуры официального сайта libuv:

С точки зрения одной только платформы Linux основную работу libuv можно просто разделить на две части:

  • Вокруг epoll обрабатывать операции ввода-вывода, поддерживаемые epoll
  • Пул потоков, который обрабатывает операции ввода-вывода, не поддерживаемые epoll.

Введение в эполл

Чтобы вернуться к источнику, мы начнем с epoll

Проще говоря, epoll — это системный вызов, предоставляемый ядром Linux, и наше приложение может его передать:

  • Скажите системе, чтобы она помогала нам отслеживать несколько файловых дескрипторов одновременно.
  • Когда рабочее состояние ввода-вывода одного или нескольких из этих файловых дескрипторов изменится, наше приложение получит уведомление о событии от системы.

цикл событий

Мы демонстрируем основные шаги при использовании epoll с небольшим фрагментом псевдокода:

// 创建 epoll 实例
int epfd = epoll_create(MAX_EVENTS);
// 向 epoll 实例中添加需要监听的文件描述符,这里是 `listen_sock`
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);

while(1) {
  // 等待来自 epoll 的通知,通知会在其中的文件描述符状态改变时
  // 由系统通知应用。通知的形式如下:
  //
  // epoll_wait 调用不会立即返回,系统会在其中的文件描述符状态发生
  // 变化时返回
  //
  // epoll_wait 调用返回后:
  // nfds 表示发生变化的文件描述符数量
  // events 会保存当前的事件,它的数量就是 nfds
  int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);

  // 遍历 events,对事件作出符合应用预期的响应
  for (int i = 0; i < nfds; i++) {
    // consume events[i]
  }
}

Полный пример см.epoll-echo-server

Вышеприведенный код содержит комментарии, которые можно примерно обобщить следующим образом:

Таким образом, epoll в нижней части libuv также имеет понятие «цикл событий» Видно, что цикл событий не уникален для libuv.

Когда дело доходит до epoll, мы должны упомянуть два его режима запуска: запуск по уровню и запуск по краю. Это следует упомянуть, потому что они связаны с механизмом запуска событий epoll, а получение имени немного неясно.

Горизонтальный триггер

Оба термина происходят из области электроники, и мы начнем с их первоначального значения.

Первый — горизонтальный триггер:

Electrical Concepts

На приведенном выше рисунке показана временная диаграмма, показывающая изменение напряжения, где VH представляет пиковое значение напряжения, а VL представляет минимальное значение напряжения телефона. Смысл горизонтального срабатывания заключается в том, что пока напряжение находится на пиковом значении, система с течением времени активирует соответствующую цепь (триггер).

крайний триггер

Electrical Concepts

Вышеприведенный рисунок по-прежнему представляет собой временную диаграмму, показывающую изменение напряжения, но условием активации схемы (срабатывания) является напряжениеИзменять, то есть напряжение изменяется от состояния ВН -> ВН, ВН -> ВН, на рисунке черезсторонаЧтобы представить это изменение, то есть восходящий фронт и спадающий фронт, поэтому он называется Edge-triggered, то есть запуск по фронту.

Мы можем примерно понять их формы и различия и продолжить понимать в сочетании с производительностью в следующем эполле.

В электронном опросе

Вернувшись в epoll, триггер по горизонтали и триггер по фронту произошли от исходного значения, конечно, они по-прежнему имеют значения, аналогичные значениям в области электроники.

Давайте разберемся на примере.Например, у нас есть fd (файловый дескриптор), который представляет только что установленное клиентское соединение, а затем клиент отправляет нам 5 байт содержимого.

Если это горизонтальный триггер:

  • Наше приложение будет разбужено системой, потому что состояние fd теперь доступно для чтения.
  • Читаем 1 байт из системного буфера и делаем какие-то бизнес-операции
  • Войдите в новый цикл событий и дождитесь следующего пробуждения системы.
  • Система продолжает пробуждать наше приложение, потому что в буфере осталось непрочитанным 4 байта.

Если срабатывает по фронту:

  • Наше приложение будет разбужено системой, потому что состояние fd теперь доступно для чтения.
  • Читаем 1 байт из системного буфера и делаем какие-то бизнес-операции
  • Войдите в новый цикл событий и дождитесь следующего пробуждения системы.
  • В это время система не будет пробуждать наше приложение до тех пор, пока клиент в следующий раз не отправит какой-либо контент, например 2 байта (поскольку состояние fd не изменится до тех пор, пока клиент в следующий раз не отправит запрос, поэтому при срабатывании по краю система не будет будить приложение)
  • Система будит наше приложение, а в буфере 6 байт = (4+2) байта

Нам сложно связать буквальное значение level-triggered и edge-triggered с вышеописанным поведением, но мы уже заранее знаем, что они означают в области электроники

Горизонтальное срабатывание, поскольку оно уже находится в состоянии чтения, оно будет продолжать срабатывание до тех пор, пока мы не закончим чтение буфера, и в системном буфере не будет отправлено новое содержимое клиентом; краевое срабатывание соответствуетизменение состояния, каждый раз, когда новый клиент отправляет контент, будет устанавливаться состояние чтения, поэтому он будет запускаться только в это время.

Горизонтальный запуск — это режим запуска epoll по умолчанию, и горизонтальный запуск также используется в libuv. Поняв разницу между запуском по горизонтали и запуском по фронту, мы можем догадаться о соображениях, лежащих в основе использования libuv горизонтального запуска вместо запуска по фронту:

Если он срабатывает по фронту, в объективной способности epoll нам не требуется читать содержимое буфера за один раз (можно подождать, пока клиент в следующий раз отправит содержимое, чтобы продолжить чтение). Но в реальном бизнесе клиент, вероятно, в это время ждет нашего ответа (что можно понять в связке с HTTP-протоколом), а мы все еще ждем следующей записи от клиента, так что это попадет в логику тупик. В результате чтение содержимого буфера за один раз стало почти обязательным методом в режиме с запуском по фронту, что неизбежно приводит к увеличению времени ожидания других обратных вызовов и делает распределение процессорного времени между обратными вызовами кажущимся неравномерным. слишком долго неравномерно

ограничение

epoll не может воздействовать на все операции ввода-вывода, такие как операции чтения и записи файлов, поэтому вы не можете пользоваться удобством epoll.

Таким образом, работу libuv можно грубо охарактеризовать следующим образом:

  • Абстрактные системные вызовы, подобные epoll, в различных операционных системах (например, kqueue в Unix и IOCP в Windows) к унифицированному API (внутреннему API).
  • Для операций ввода-вывода, которые могут использовать системные вызовы, предпочтительным является унифицированный API.
  • Для неподдерживаемых или недостаточно поддерживаемых операций ввода-вывода используйте пулы потоков для имитации асинхронных API.
  • Наконец, инкапсулируйте вышеуказанные детали внутри и предоставьте унифицированный API внешнему миру.

вернуться к либуву

Возвращаясь к libuv, мы будем использовать цикл событий в качестве основного контекста в сочетании с упомянутым выше epoll и пулом потоков, который будет представлен ниже, и продолжим изучение деталей реализации libuv в Linux.

event-loop

Мы рассмотрим основные концепции событийного цикла вместе с исходным кодом.

Следующая картинка, также взятая с официального сайта libuv, описывает внутреннюю работу цикла событий:

Цитата изlibuv - Design overview

Взгляд только на блок-схему может быть слишком абстрактным, ниже приведена внутренняя реализация соответствующей libuv.полное содержание:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r) uv__update_time(loop);

  // 是循环,没错了
  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    // 处理 timer 队列
    uv__run_timers(loop);
    // 处理 pending 队列
    ran_pending = uv__run_pending(loop);
    // 处理 idle 队列
    uv__run_idle(loop);
    // 处理 prepare 队列
    uv__run_prepare(loop);

    // 执行 io_poll
    uv__io_poll(loop, timeout);
    uv__metrics_update_idle_time(loop);

    // 执行 check 队列
    uv__run_check(loop);
    // 执行 closing 队列
    uv__run_closing_handles(loop);

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break;
  }

  return r;
}

Причина, по которой различные формы обратных вызовов (например,setTimeout) будут отличаться по приоритету, потому что они используют разные очереди, а разные очереди выполняются в разном порядке на каждой итерации цикла обработки событий

Обработка и запрос

Согласно описанию на официальном сайте, это абстракции операций, выполняемых в цикле событий: первые представляют собой операции, которые должны существовать в течение длительного времени, а вторые представляют собой краткосрочные операции. Это может быть непросто понять, просто взглянув на текстовое описание, давайте посмотрим, как они используются по-разному.

Для долгоживущих операций, представленных Handle, их API имеет вид, аналогичный следующему:

// IO 操作
int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, uv_os_sock_t socket);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* poll);

// timer
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);

Есть примерно три шага (не все):初始化 -> 开始 -> 停止. Это легко понять, потому что это долговременная операция, она будет продолжать обрабатываться при запуске, поэтому вам нужно организовать «стоп» API

Для коротких операций, представленных запросом, таких как разрешение доменного имени:

int uv_getaddrinfo(uv_loop_t* loop, uv_getaddrinfo_t* req, uv_getaddrinfo_cb getaddrinfo_cb, /* ... */);

Интерактивная форма операции разрешения доменного имени заключается в том, что мы отправляем адрес для разрешения, и метод возвращает результат разрешения (это немного похоже на запрос HTTP 1.0), поэтому причина присвоения такого имени операция по "запрос-запрос" меняется.Должна быть картинка

Однако Handle и Request не являются взаимоисключающими понятиями, и внутренняя реализация Handle также может использовать Request. Из-за некоторых макроскопических долговременных операций он может рассматриваться как запрос в каждом временном интервале, например, когда мы обрабатываем запрос, он может рассматриваться как дескриптор, и в текущем запросе мы, вероятно, делаем некоторые Операции чтения и записи, эти операции можно рассматривать как запрос

timer

Мы используем API, открытый таймером, как подсказку для анализа его внутренней реализации:

int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);

uv_timer_initНичего особенного, просто инициализируйтеhandleсостояние и добавить его вloop->handle_queueсередина

uv_timer_startЭто делается внутри:

int uv_timer_start(uv_timer_t* handle,
                   uv_timer_cb cb,
                   uint64_t timeout,
                   uint64_t repeat) {
  uint64_t clamped_timeout;

  // loop->time 表示 loop 当前的时间。loop 每次迭代开始时,会用当次时间更新该值
  // clamped_timeout 就是该 timer 未来超时的时间点,这里直接计算好,这样未来就不需要
  // 计算了,直接从 timers 中取符合条件的即可
  if (clamped_timeout < timeout)
    clamped_timeout = (uint64_t) -1;

  handle->timer_cb = cb;
  handle->timeout = clamped_timeout;
  handle->repeat = repeat;

  // 除了预先计算好的 clamped_timeout 以外,未来当 clamped_timeout 相同时,使用这里的
  // 自增 start_id 作为比较条件来觉得 handle 的执行先后顺序
  handle->start_id = handle->loop->timer_counter++;

  // 将 handle 插入到 timer_heap 中,这里的 heap 是 binary min heap,所以根节点就是
  // clamped_timeout 值(或者 start_id)最小的 handle
  heap_insert(timer_heap(handle->loop),
              (struct heap_node*) &handle->heap_node,
              timer_less_than);
  // 设置 handle 的开始状态
  uv__handle_start(handle);

  return 0;
}

uv_timer_stopЭто делается внутри:

int uv_timer_stop(uv_timer_t* handle) {
  if (!uv__is_active(handle))
    return 0;

  // 将 handle 移出 timer_heap,和 heap_insert 操作一样,除了移出之外
  // 还会维护 timer_heap 以保障其始终是 binary min heap
  heap_remove(timer_heap(handle->loop),
              (struct heap_node*) &handle->heap_node,
              timer_less_than);
  // 设置 handle 的状态为停止
  uv__handle_stop(handle);

  return 0;
}

До сих пор нам были известны так называемыеstartа такжеstopНа самом деле, это можно приблизительно резюмировать так: атрибутироватьloop->timer_heapвставляет или удаляет дескриптор из , и этот атрибут использует структуру данных, называемую двоичной минимальной кучей

Затем просматриваем вышеизложенноеuv_run:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while (r != 0 && loop->stop_flag == 0) {
    // ...
    uv__update_time(loop);
    uv__run_timers(loop);
    // ...
  }
  // ...
}

uv__update_timeМы видели это раньше, эффект заключается в установке свойства в начале цикла с текущим временем.loop->time

Нам просто нужен последний взглядuv__run_timersсодержание, весь процесс можно соединить последовательно:

void uv__run_timers(uv_loop_t* loop) {
  struct heap_node* heap_node;
  uv_timer_t* handle;

  for (;;) {
    // 取根节点,该值保证始终是所有待执行的 handle
    // 中,最先超时的那一个
    heap_node = heap_min(timer_heap(loop));
    if (heap_node == NULL)
      break;

    handle = container_of(heap_node, uv_timer_t, heap_node);
    if (handle->timeout > loop->time)
      break;

    // 停止、移出 handle、顺便维护 timer_heap
    uv_timer_stop(handle);
    // 如果是需要 repeat 的 handle,则重新加入到 timer_heap 中
    // 会在下一次事件循环中、由本方法继续执行
    uv_timer_again(handle);
    // 执行超时 handle 其对应的回调
    handle->timer_cb(handle);
  }
}

Выше приведена общая реализация таймера в Libuv.

min heap

Как мы увидим позже, дескрипторы, отличные от таймера, хранятся в структуре данных с именем очередь, а структура данных, в которой хранится дескриптор таймера, представляет собой минимальную кучу. Итак, давайте посмотрим на значение этой разницы

На самом деле так называемая минимальная куча (для более подробного ознакомления вы можете обратиться кBinary Tree):

  • complete binary tree
  • Корневой узел — это самый маленький узел в дереве

Сначала взгляните на бинарное дерево (определение бинарного дерева таково):

  • Все узлы имеют не более двух дочерних элементов

Глядя дальше, определение полного бинарного дерева:

  • Каждый узел во всех слоях, кроме последнего слояесть двадочерний узел
  • Логика расположения последнего слоя заключается в том, чтобы расположить их по порядку слева направо (старайтесь максимально заполнить левую часть)

Вот несколько примеров:

complete binary tree 的例子:

               18
            /      \
         15         30
        /  \        /  \
      40    50    100   40
     /  \   /
    8   7  9

下面不是 complete binary tree,因为最后一层没有优先放满左边

               18
             /    \
          40       30
                   /  \
                 100   40

min heap 的例子,根节点是最小值、父节点始终小于其子节点:

               18
             /    \
           40       30
         /  \
      100   40

Операции, необходимые для дескриптора таймера в libuv:

  • Добавляйте и удаляйте ручки таймера
  • получить это быстроclamped_timeoutминимальная ручка таймера

И min heap учитывает вышеуказанные требования:

  • Относительный массив с более высокой эффективностью вставки и удаления
  • По сравнению со связанными списками эффективнее поддерживать крайние значения (здесь минимальное значение)

Реализация кучи в файлеheap-inl.h, я добавил некоторые примечания, заинтересованные студенты могут продолжить узнавать

pending

Выше мы узнали об обработке таймера в первом порядке в каждой итерации цикла событий, далее рассмотрим обработку отложенной очереди во втором порядке:

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  QUEUE_MOVE(&loop->pending_queue, &pq);

  // 不断从队列中弹出元素进行操作
  while (!QUEUE_EMPTY(&pq)) {
    q = QUEUE_HEAD(&pq);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}

Из исходников это только из очередиloop->pending_queueЭлемент постоянно извлекается, а затем выполняется, а извлеченный элементuv__io_tСвойства структуры, судя по названию, примерно должны быть операциями, связанными с вводом-выводом.

Кроме того, даloop->pending_queueТолько функции выполняют операции вставкиuv__io_feed, вызываемая точка этой функции в основном предназначена для выполнения некоторых отделочных работ, связанных с вводом-выводом.

queue

Как и в случае с минимальной кучей выше, очередь также является основной используемой структурой данных, поэтому, когда мы впервые увидим ее, давайте, между прочим, представим ее.

Реализация min heap относительно глубже, поэтому предоставляются комментарии на основе исходного кода.heap-inl.hПусть заинтересованные читатели получат более глубокое понимание, в то время как очередь относительно проста, и повсюду в исходном коде есть макросы, которые управляют очередью. Зная, что эти макросы делают, вы будете чувствовать себя более непринужденно при чтении исходного кода.

Далее давайте взглянем на очередь и некоторые часто используемые макросы, работающие с ней, в первую очередь, начальное состояние:

Очередь спроектирована как кольцевая структура в libuv, поэтому начальное состояниеnextа такжеprevоба указывают на себя

Далее рассмотрим форму вставки нового элемента в очередь:

Приведенный выше рисунок разделен на две части: верхняя часть — это существующая очередь, h — ее текущий заголовок, а q — элемент, который нужно вставить. Нижняя часть - результат после вставки, красный цвет на рисунке представляетprevпуть, показанный фиолетовым цветомnext, по пути мы можем обнаружить, что они всегда представляют собой кольцевую структуру

показано вышеQUEUE_INSERT_TAILКак следует из названия, он вставляется в конец очереди, и, поскольку это кольцевая структура, нам нужно изменить ссылочное отношение между началом, хвостом и вставляемыми элементами.

Взгляните еще раз на форму, которая удаляет элемент:

Проще удалить элемент, т.е.prevа такжеnextОн может быть подключен, чтобы после подключения элемент пропускался, чтобы элемент выглядел удаленным (не может быть доступен в пути)

Продолжаем видеть операцию соединения двух очередей:

Выглядит сложно, но на самом деле нужно всего лишь развязать два кольца, а затем соединить их встык, чтобы образовалось новое кольцо. Здесь, используя метод отображения потока сознания, используйте1а также2Соответствующая связь между кодом и действием подключения отмечена

Наконец, посмотрите на операцию, которая разбивает очередь на две части:

На приведенном выше рисунке также используется метод отображения потока сознания с использованием1а также2Отмечена соответствующая связь между кодом и действием подключения;hОчередь в начале, вqотрезать,hа такжеqПредыдущие элементы соединяются, образуя новую очередь;nВ качестве начала другой очереди подключитеqи конец очереди перед отключением для формирования другой очереди

Некоторые репрезентативные операции с очередью показаны выше, и заинтересованные учащиеся могут продолжить просмотр.queue.hПриходите узнать

бездействовать, проверить, подготовить

Вам может быть интересно, почему они представлены не в том порядке, в котором они представлены в цикле событий, а три из них размещены вместе

Если искать в исходном кодеuv__run_idleилиuv__run_checkбыло бы еще более странным, потому что мы можем найти только их объявления, даже не их определения

На самом деле они всеloop-watcher.cгенерируются макросами, потому что их операции одинаковы - берут дескрипторы из соответствующих очередей и выполняют их

Следует отметить, что всех не должно смущать название idle.Это не очередь, которая будет выполняться при простое событийного цикла, а будет выполняться на каждой итерации цикла по времени.Смысла idle нет вообще.

Впрочем, говорить о том, что смысла простоя вообще нет, кажется не совсем уместным, например, внутренняя реализация очередей idle и prepare есть не что иное, как очередь, которая выполняется последовательно:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while (r != 0 && loop->stop_flag == 0) {
    // ...
    uv__run_idle(loop);
    uv__run_prepare(loop);
    uv__io_poll(loop, timeout);
    // ...
  }
  // ...
}

Итак, теперь есть дескриптор, мы хотим его вuv__io_pollПеред выполнением он добавляется в очередь ожидания или подготовки?

Я думаю, что подготовиться к следующемуuv__io_poll"Prepare" означает, что если это дескриптор, подготовленный для io_poll, то его можно добавить в очередь на подготовку, а остальные добавить в простои. Я думаю, что тот же параметр также применяется к проверке, которая запускается после io_poll, позволяя пользователям выполнять некоторую работу по проверке результатов выполнения ввода-вывода, делая очередь задач более семантической.

io poll

Для io_poll мы по-прежнему начинаем анализ с цикла событий

Начните с цикла событий

Вот фрагмент уже представленного выше цикла обработки событий:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while (r != 0 && loop->stop_flag == 0) {
    // ...
    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    // ...
  }
  // ...
}

Приведенный выше код вычисляетtimeoutдля звонкаuv__io_poll(loop, timeout)

действительно эполл

uv__io_pollопределено вlinux-core.c, хотя это функция почти из 300 строк, включая комментарии, вы, должно быть, обнаружили, что основной логикой является использование epoll, продемонстрированное в начале:

void uv__io_poll(uv_loop_t* loop, int timeout) {
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    // ...
    // `loop->backend_fd` 是使用 `epoll_create` 创建的 epoll 实例
    epoll_ctl(loop->backend_fd, op, w->fd, &e)
    // ...
  }

  // ...
  for (;;) {
  // ...
    if (/* ... */) {
      // ...
    } else {
      // ...
      // `epoll_wait` 和 `epoll_pwait` 只有细微的差别,所以这里只考虑前者
      nfds = epoll_wait(loop->backend_fd,
                        events,
                        ARRAY_SIZE(events),
                        timeout);
      // ...
    }
  }
  // ...

  for (i = 0; i < nfds; i++) {
    // ...
    w = loop->watchers[fd];
    // ...
    w->cb(loop, w, pe->events);
  }
}

timeout

epoll_waitизtimeoutСмысл параметров следующий:

  • если-1значит ждать, пока не произойдет событие
  • если0возвращается немедленно, включая событие, сгенерированное при вызове
  • Если это остальные целые числа, тоmillisecondsЕдиница сводится к определенному кванту системного времени в будущем

Объединив вышеизложенное, рассмотримuv_backend_timeoutкак это рассчитываетсяtimeoutиз:

int uv_backend_timeout(const uv_loop_t* loop) {
  // 时间循环被外部停止了,所以让 `uv__io_poll` 理解返回
  // 以便尽快结束事件循环
  if (loop->stop_flag != 0)
    return 0;

  // 没有待处理的 handle 和 request,则也不需要等待了,同样让 `uv__io_poll`
  // 尽快返回
  if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
    return 0;

  // idle 队列不为空,也要求 `uv__io_poll` 尽快返回,这样尽快进入下一个时间循环
  // 否则会导致 idle 产生过高的延迟
  if (!QUEUE_EMPTY(&loop->idle_handles))
    return 0;

  // 和上一步目的一样,不过这里是换成了 pending 队列
  if (!QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  // 和上一步目的一样,不过这里换成,待关闭的 handles,都是为了避免目标队列产生
  // 过高的延迟
  if (loop->closing_handles)
    return 0;

  return uv__next_timeout(loop);
}

int uv__next_timeout(const uv_loop_t* loop) {
  const struct heap_node* heap_node;
  const uv_timer_t* handle;
  uint64_t diff;

  heap_node = heap_min(timer_heap(loop));
  // 如果没有 timer 待处理,则可以放心的 block 住,等待事件到达
  if (heap_node == NULL)
    return -1; /* block indefinitely */

  handle = container_of(heap_node, uv_timer_t, heap_node);
  // 有 timer,且 timer 已经到了要被执行的时间内,则需让 `uv__io_poll`
  // 尽快返回,以在下一个事件循环迭代内处理超时的 timer
  if (handle->timeout <= loop->time)
    return 0;

  // 没有 timer 超时,用最小超时间减去、当前的循环时间的差值,作为超时时间
  // 因为在为了这个差值时间内是没有 timer 超时的,所以可以放心 block 以等待
  // epoll 事件
  diff = handle->timeout - loop->time;
  if (diff > INT_MAX)
    diff = INT_MAX;

  return (int) diff;
}

надuv__next_timeoutРеализация в основном делится на три части:

  • Только если нет отложенного таймера, он будет-1, в сочетании с парой в начале этого разделаepoll_waitизtimeoutПояснение параметров,-1сделает последующиеuv__io_pollВойдите в состояние блокировки и полностью дождитесь прихода события
  • Когда есть таймер, а есть таймер с таймаутомhandle->timeout <= loop->time, затем вернуться0,такuv__io_pollНе блокирует цикл событий, цель состоит в том, чтобы быстро войти в следующий цикл событий для выполнения таймера тайм-аута
  • Когда таймер есть, но нет таймаута, рассчитывается минимальное время таймаутаdiffприйти какuv__io_pollвремя блокировки

Я не знаю, нашли ли вы его.Основная направляющая идея расчета тайм-аута состоит в том, чтобы сделать процессорное время как можно более равномерно распределенным в нескольких итерациях цикла событий, выполнении нескольких разных очередей задач и избежать определенные типы задач, которые генерируют высокую задержку

маленький каштан

После понимания того, как выполняется очередь io_poll, у нас есть общее понимание io_poll через небольшой каштан эхо-сервера:

uv_loop_t *loop;

void echo_write(uv_write_t *req, int status) {
  // ...
  // 一些无所谓有,但有所谓无的收尾工作
}

void echo_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf) {
  // ...
  // 创建一个写入请求(上文已经介绍过 Request 和 Handle 的区别),
  // 将读取的客户端内容写回给客户端,写入完成后进入回调 `echo_write`
  uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
  uv_write(write_req, client, &buf, 1, echo_write);
}

void on_new_connection(uv_stream_t *server, int status) {
  // ...
  // 创建 client 实例并关联到事件循环
  uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
  uv_tcp_init(loop, client);
  // 与建立客户端连接,并读取客户端输入,读取完成后进入 `echo_read` 回调
  if (uv_accept(server, (uv_stream_t*) client) == 0) {
    uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
  }
  // ...
}

int main() {
  // 创建事件循环
  loop = uv_default_loop();

  // 创建 server 实例并关联事件循环
  uv_tcp_t server;
  uv_tcp_init(loop, &server);
  // ...
  // 绑定 server 到某个端口,并接受请求
  uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 7000));
  // 新的客户端请求到达后,会进去到 `on_new_connection` 回调
  uv_listen((uv_stream_t*) &server, 128, on_new_connection);
  // ...

  // 启动事件循环
  return uv_run(loop, UV_RUN_DEFAULT);
}

Thead pool

На данный момент мы подтвердили, что внутренняя реализация io_poll действительно использует epoll. В начале этой статьи мы также упомянули, что в настоящее время epoll не может обрабатывать все операции ввода-вывода.Для тех операций ввода-вывода, которые epoll не поддерживает, libuv использует свой внутренний пул потоков для имитации асинхронного ввода-вывода. Далее давайте взглянем на общую рабочую форму пула потоков.

Создайте

Поскольку мы уже знаем, что операция чтения и записи файлов не может использовать epoll, то следуйте этой подсказке и пройдитеuv_fs_readВнутренняя реализация , найтиuv__work_submitнайдите пул потоков, в котором он инициализирован:

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  // ...
  post(&w->wq, kind);
}

Так что создайте пул резьбы, это один случай создан задержки.init_onceбудет вызываться внутреннеinit_threadsЧтобы завершить работу по инициализации пула потоков:

static uv_thread_t default_threads[4];

static void init_threads(void) {
  // ...
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  // ...
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  // ...
}

Благодаря приведенной выше реализации мы знаем, что количество потоков в пуле потоков по умолчанию равно4, и можно пройти черезUV_THREADPOOL_SIZEпеременная среды для переназначения значения

Помимо одноэлементного ленивого создания пулов потоков,uv__work_submitКонечно, он все равно поставит задачу, эта часть работыpost(&w->wq, kind)Готово, давайте посмотримpostДетали реализации метода:

static void post(QUEUE* q, enum uv__work_kind kind) {
  uv_mutex_lock(&mutex);
  // ...
  // 将任务插入到 `wq` 这个线程共享的队列中
  QUEUE_INSERT_TAIL(&wq, q);
  // 如果有空闲线程,则通知它们开始工作
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

Можно обнаружить, что для отправки задач он фактически вставляет задачи в общую очередь потока.wq, и уведомлять их о работе только тогда, когда есть незанятые потоки. Итак, если в это время нет незанятых потоков, игнорируется ли задача? Ответ — нет, потому что рабочий поток будет активно проверять после завершения текущей работы.wqЕсть ли в очереди еще работа, которую нужно выполнить, если есть, она будет продолжать выполняться, если нет, она перейдет в спящий режим и будет ждать следующего пробуждения (подробности этой части будут представлены позже)

Как запланированы задачи

При создании темы вышеuv_thread_create(threads + i, worker, &sem)серединаworkerЭто содержимое выполнения потока, давайте посмотримworkerПримерное содержание:

// 线程池的 wq,提交的任务都先链到其中
static QUEUE wq;

static void worker(void* arg) {
  // ...
  // `idle_threads` 和 `run_slow_work_message` 这些是线程共享的,所以要加个锁
  uv_mutex_lock(&mutex);
  for (;;) {
    // 这里的条件判断,可以大致看成是「没有任务」为 true
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      // 轮转到当前进程时因为没有任务,则无事可做
      // 空闲线程数 +1
      idle_threads += 1;
      
      // `uv_cond_wait` 内部是使用 `pthread_cond_wait` 调用后会:
      // - 让线程进入等待状态,等待条件变量 `cond` 发生变更
      // - 对 `mutex` 解锁
      //
      // 此后,其他线程中均可使用 `uv_cond_signal` 内部是 `pthread_cond_signal` 
      // 来广播一个条件变量 `cond` 变更的事件,操作系统内部会随机唤醒一个等待 `cond` 
      // 变更的线程,并在被唤醒线程的 uv_cond_wait 调用返回之前,对之前传入的 `mutex` 
      // 参数上锁
      //
      // 因此循环跳出(有任务)后,`mutex` 一定是上锁的
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
    // ...
    // 因为上锁了,所以放心进行队列的弹出操作
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);
    // ...
    // 因为已经完成了弹出,可以解锁,让其他线程可以继续操作队列
    uv_mutex_unlock(&mutex);

    // 利用 c 结构体的小特性,做字段偏移,拿到 `q` 所属的 `uv__work` 实例
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    // 下面要操作 `w->loop->wq` 所以要上锁
    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL; 

    // 需要看仔细,和开头部分线程池中的 wq 区别开
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);

    // 唤醒主线程的事件循环
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    // 这一步上锁是必须的,因为下次迭代的开头又需要
    // 操作共享内存,不过不必担心死锁,因为它和下一次迭代
    // 中的 `uv_cond_wait` 解锁操作是对应的
    uv_mutex_lock(&mutex);
    // ...
  }
}

Выше мы сохранили относительно важный контент и закомментировали его. Его можно примерно резюмировать так:

  • Для потоков в пуле потоков он будет передан черезuv_cond_waitдождаться пробуждения
  • После пробуждения потока отwqАктивно находите задачу для выполнения в середине и пробуждайте основной поток, когда задача завершена, потому что обратный вызов должен выполняться в основном потоке.
  • Затем введите следующую итерацию, если есть задачи, продолжайте выполнять, пока не останется задач, пройдитеuv_cond_waitиди спать снова
  • Пробуждение выполняется с использованием в другом потокеuv_cond_signalуведомить операционную систему о необходимости планирования
  • Пул потоков представляет собой масштабируемую конструкцию. При отсутствии задачи все потоки переходят в состояние сна. Когда количество задач постепенно увеличивается, активный поток попытается разбудить спящий поток.

разбудить основной поток

Когда пул потоков завершает задачу, необходимо уведомить основной поток о выполнении соответствующего обратного вызова. Способ уведомления очень интересный, давайте сначала посмотрим на операцию инициализации цикла событийuv_loop_init:

int uv_loop_init(uv_loop_t* loop) {
  // ...
  // 初始化 min heap 和各种队列,用于存放各式的 handles
  heap_init((struct heap*) &loop->timer_heap);
  QUEUE_INIT(&loop->wq);
  QUEUE_INIT(&loop->idle_handles);
  QUEUE_INIT(&loop->async_handles);
  QUEUE_INIT(&loop->check_handles);
  QUEUE_INIT(&loop->prepare_handles);
  QUEUE_INIT(&loop->handle_queue);

  // ...
  // 调用 `epoll_create` 创建 epoll 实例
  err = uv__platform_loop_init(loop);
  if (err)
    goto fail_platform_init;

  // ...
  // 用于线程池通知的初始化
  err = uv_async_init(loop, &loop->wq_async, uv__work_done);
  // ...
}

в коде вышеuv_async_initОн используется для инициализации работы, связанной с уведомлением пула потоков, ниже приведена сигнатура его функции:

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb);

Итак, третий аргументuv__work_doneПо сути, это callback-функция, можем посмотреть на ее содержимое:

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  // 将目前的 `loop->wq` 全部移动到局部变量 `wq` 中,
  //
  // `loop->wq` 中的内容是在上文 worker 中任务完成后使用
  // `QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq)` 添加的
  //
  // 这样尽快释放锁,让其他任务可尽快接入
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  // 遍历 `wq` 执行其中每个任务的完成回调
  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}

понялuv__work_doneОн отвечает за выполнение задачи и завершение работы обратного вызова, продолжайте смотреть на него.uv_async_initконтент, чтобы увидеть, как он используется внутриuv__work_doneиз:

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  // ...
  // 待调查
  err = uv__async_start(loop);
  // ...

  // 创建了一个 async handle
  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  // 在目前的脉络中 `async_cb` 就是 `uv__work_done` 了
  handle->async_cb = async_cb;
  handle->pending = 0;

  // 把 async handle 加入到队列 `loop->async_handles` 中
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  // ...
}

Давайте перейдем к тому, что мы исследовали ранееuv__async_startСодержание:

static int uv__async_start(uv_loop_t* loop) {
  // ...
  // `eventfd` 可以创建一个 epoll 内部维护的 fd,该 fd 可以和其他真实的 fd(比如 socket fd)一样
  // 添加到 epoll 实例中,可以监听它的可读事件,也可以对其进行写入操作,因此就用户代码就可以借助这个
  // 看似虚拟的 fd 来实现的事件订阅了
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = -1;
  // ...

  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

Мы знаем, что epoll поддерживает сокет fd, и для поддерживаемого fd планирование событий epoll будет очень эффективным. Для неподдерживаемых операций ввода-вывода libuv используетeventfdСоздайте виртуальный fd и продолжайте использовать функцию планирования событий fd

Продолжаем смотреть выше появляютсяuv__io_startдетали, чтобы подтвердить шаги подписки на событие:

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  // ...

  // 大家可以翻到上面 `uv__io_poll` 的部分,会发现其中有遍历 `loop->watcher_queue`
  // 将其中的 fd 都加入到 epoll 实例中,以订阅它们的事件的动作
  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  // 将 fd 和对应的任务关联的操作,同样可以翻看上面的 `uv__io_poll`,当接收到事件
  // 通知后,会有从 `loop->watchers` 中根据 fd 取出任务并执行其完成回调的动作
  // 另外,根据 fd 确保 watcher 不会被重复添加
  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

После подтверждения шага подписки на событие давайте взглянем на содержимое обратного вызова события. вышеуказанные параметрыwВ нашем текущем контексте соответствующие аргументыloop->async_io_watcher, пока он проходит черезuv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0])Инициализировано, давайте посмотримuv__io_initСигнатура функции:

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);

такuv__async_ioэто функция обратного вызова, которая получает событие виртуального fd, продолжайте просматривать его содержимое:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  // ...
  // 确保 `w` 必定是 `loop->async_io_watcher`
  assert(w == &loop->async_io_watcher);

  for (;;) {
    // 从中读一些内容,`w->fd` 就是上面使用 `eventfd` 创建的虚拟 fd
    // 不出意外的话,通知那端的方式、一定是往这个 fd 里面写入一些内容,我们可以后面继续确认
    // 从中读取一些内容的目的是避免缓冲区被通知所用的不含实际意义的字节占满
    r = read(w->fd, buf, sizeof(buf));
    // ...
  }

  // 执行 `loop->async_handles` 队列,任务实际的回调
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    // ...
    h->async_cb(h);
  }
}

Мы уже знаем, как подписываться на события и как реагировать на события.

Затем продолжайте и подтвердите, как уведомление о событии запускается в пуле потоков.uv_async_sendЭто открытый API для пробуждения основного потока, на самом деле это внутренний API, называемыйuv__async_send:

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
 
  // ...
  fd = loop->async_io_watcher.fd; 

  do
    // 果然事件通知这一端就是往 `eventfd` 创建的虚拟 fd 写入数据
    // 剩下的就是交给 epoll 高效的事件调度机制唤醒事件订阅方就可以了
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  // ...
}

Наконец, мы суммируем процесс вышеупомянутого пула потоков через диаграмму потока сознания:

На картинке выше наша задача состоит в том, чтобыuv__run_idle(loop);Передано в выполненном обратном вызовеuv__work_submitВыполнено, но на самом деле для приложений, использующих циклы событий, квант времени всего приложения разбивается на разные callback-и очередей, так что реально можно отправлять задачи из остальных очередей.

closing

Как мы упоминали в начале, только Handle оснащен API отключения, т.к. Request — это кратковременная задача. Необходимо использовать закрытие Handleuv_close:

void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
  assert(!uv__is_closing(handle));

  handle->flags |= UV_HANDLE_CLOSING;
  handle->close_cb = close_cb;

  switch (handle->type) {
  // 根据不同的 handle 类型,执行各自的资源回收工作
  case UV_NAMED_PIPE:
    uv__pipe_close((uv_pipe_t*)handle);
    break;

  case UV_TTY:
    uv__stream_close((uv_stream_t*)handle);
    break;

  case UV_TCP:
    uv__tcp_close((uv_tcp_t*)handle);
    break;
  // ...

  default:
    assert(0);
  }
  
  // 添加到 `loop->closing_handles`
  uv__make_close_pending(handle);
}

void uv__make_close_pending(uv_handle_t* handle) {
  assert(handle->flags & UV_HANDLE_CLOSING);
  assert(!(handle->flags & UV_HANDLE_CLOSED));
  handle->next_closing = handle->loop->closing_handles;
  handle->loop->closing_handles = handle;
}

передачаuv_closeПосле закрытия дескриптора libuv сначала освободит ресурсы, занятые дескриптором (например, закрытие fd), а затем вызоветuv__make_close_pendingподключить ручку кclosing_handlesочередь, которая будет вызываться в цикле событийuv__run_closing_handles(loop)вызов выполнен

После использования цикла событий время выполнения бизнес-кода находится в обратном вызове, потому чтоclosing_handlesявляется последней очередью для выполнения, поэтому в обратных вызовах оставшихся очередей те, которые выполняютсяuv_closeОбратные вызовы, переданные во время, будут выполнены в текущей итерации.

резюме

В этой статье кратко исследуется его внутренняя реализация по аналогии с реализацией Libuv для Linux, пытаясь разгадать тайну libuv. Очевидно, что просто прочитать это недостаточно, я надеюсь, что это может быть использовано в качестве начального чтения для людей, которые хотят узнать больше о libuv. Позже мы объединим Node.js, чтобы изучить, как они связаны внутри.

Эта статья была опубликована сКоманда внешнего интерфейса NetEase Cloud Music, Любое несанкционированное воспроизведение статьи запрещено. Мы набираем front-end, iOS и Android круглый год.Если вы готовы сменить работу и любите облачную музыку, присоединяйтесь к нам на grp.music-fe(at)corp.netease.com!