Сердцебиение и тайм-аут: высокий параллелизм и тайм-аут колеса времени высокой производительности

Java

[TOC]

введение

Во многих бизнес-сценариях мы столкнемся с необходимостью отложенных задач и запланированных задач. В частности, в сценарии сетевого подключения часто возникает некоторый контроль времени ожидания. Из-за большого количества подключений на стороне сервера количество этих задач тайм-аута часто бывает очень большим. Внедрение управления тайм-аутом для большого количества задач — непростая задача.

В этой главе мы познакомимся с несколькими структурами данных, используемыми для реализации задач тайм-аута, и, наконец, проанализируем структуру и код, которые Netty использует для задач тайм-аута.

Добро пожаловать в группу технического обмена 186233599 для обсуждения и обмена, а также приглашаем обратить внимание на публичный аккаунт автора: Feng Huo Shuo.

JDK изначально обеспечивает поддержку задач тайм-аута

java.util.Timer

Представлено в JDK 1.3TimerСтруктуры данных используются для реализации временных задач.TimerИдея реализации относительно проста и имеет два основных свойства:

  • TaskQueue: абстрактный класс запланированной задачи.TimeTaskСписок.
  • TimerThread: Поток, используемый для выполнения задач по времени.

TimerСтруктура также определяет абстрактный классTimerTaskи унаследовалRunnableинтерфейс. Бизнес-система реализует этот абстрактный классrunЭтот метод используется для обеспечения определенной логики задачи задержки.

TaskQueueВнутри используется большая верхняя куча, и задачи сортируются в соответствии со временем запуска задач. иTimerThreadв бесконечном цикле изTaskQueueПолучить заголовок очереди, запустить задачу после ожидания тайм-аута задачи в начале очереди и удалить задачу из очереди.

TimerСтруктуры данных и алгоритмы просты для понимания. Все задачи с истекшим временем ожидания сначала попадают в очередь задержки. Фоновый поток тайм-аута постоянно извлекает задачи из очереди задержки и ожидает истечения периода тайм-аута перед выполнением задачи. Отложенная очередь принимает сортировку большой верхней кучи.В сценарии отложенных задач есть три операции, а именно: добавление задач, извлечение задачи в начале очереди и просмотр задачи в начале очереди.

Сложность события просмотра задачи в начале очереди составляет O(1). Временная сложность добавления задачи и извлечения задачи головы очереди составляет O(Log2н) . Когда количество задач велико, накладные расходы на добавление и удаление также относительно велики. Кроме того, посколькуTimerВнутри всего один поток обработки, если обработка отложенной задачи занимает много времени, то и обработка последующих задач будет соответственно задерживаться.

ScheduledThreadPoolExecutor

так какTimerДля обработки отложенных задач используется только один поток, чего явно недостаточно при большом количестве задач. Интерфейс пула потоков был представлен в JDK1.5.ExecutorServiceПосле этого также предоставляется соответствующий для обработки отложенных задач.ScheduledExecutorServiceИнтерфейс подкласса. Интерфейс также использует очередь задержки, которая использует небольшую верхнюю кучу для сортировки для хранения задач. Потоки в пуле потоков будут ждать в этой очереди, пока не появится задача для выборки.

ScheduledExecutorServiceВ реализации есть некоторые особенности, только один поток может извлекать задачу во главе очереди задержки и ждать в соответствии с тайм-аутом задачи. В течение этого периода ожидания другие потоки не могут получать задачи. Эта реализация предназначена для предотвращения одновременного получения задач несколькими потоками, что приводит к запуску задачи до истечения периода ожидания или невозможности ответа при добавлении новой задачи во время ожидания периода ожидания задачи.

так какScheduledExecutorServiceМожно использовать несколько потоков, что также уменьшает блокировку последующих задач, вызванную длительным временем выполнения отдельных задач. Однако очередь задержки также сортируется по малой верхней куче, поэтому временная сложность добавления и удаления задач составляет O(Log2н) . В случае большого количества задач производительность относительно низкая.

более эффективные структуры данных

Несмотря на то чтоTimerиScheduledThreadPoolExecutorОба обеспечивают поддержку отложенных задач, но, поскольку временная сложность новых задач и задач извлечения равна O(Log2n), когда количество задач велико, например десятки тысяч или сотни тысяч, накладные расходы на производительность становятся огромными.

Тогда есть ли новая задача и задача извлечения, чем O(Log2n) А как насчет менее сложных структур данных? Ответ есть. В статье «Хешированные и иерархические временные колеса» разработана структура данных, называемая временными колесами, которая снижает временную сложность добавления и удаления задач до O(1) .

Фундаментальный

Структура данных колеса времени очень похожа на указатель данных на наших часах, отсюда и название «колесо времени». Его структура данных показана следующим образом

Каждый раз «сетка» называется слотом, а слот хранит очередь отложенных задач. Сам слот представляет собой единицу времени, например 1 секунду. Количество слотов, которыми владеет колесо времени, является задачей максимального диапазона задержки, с которым может справиться колесо времени, а единица времени слота представляет собой точность колеса времени. Это означает, что времена, меньшие единицы времени, неразличимы в этом колесе времени.

Все задачи в очереди отложенных задач в слоте имеют одинаковое время задержки. Каждую единицу времени указатель перемещается на следующий слот. Когда указатель указывает на слот, все задачи в очереди отложенных задач этого слота будут запущены.

Когда в колесо времени нужно вставить отложенную задачу, сначала вычислите остаточное значение ее времени задержки и единичного времени и переместите количество остаточных значений из текущего слота, на который указывает указатель, который является слотом что отложенная задача должна быть помещена в. .

Например, колесо времени имеет 8 слотов, пронумерованных от 0 до 7. Указатель в настоящее время указывает на слот 2 . Добавьте отложенную задачу со временем задержки 4 секунды, 4 % 8 = 4, поэтому задача будет вставлена ​​в 4 + 2 = 6, то есть в очередь отложенных задач в слоте 6.

Слотовая реализация колеса времени может быть достигнута с помощью кругового массива, то есть указатель возвращается к начальному индексу после пересечения границы массива. В общих чертах алгоритм колеса времени можно описать как

Очередь используется для хранения отложенных задач, а задачи в одной очереди имеют одинаковое время задержки. Элементы хранятся в циклическом массиве, каждый элемент массива указывает на очередь отложенных задач.

Существует текущий указатель на определенный слот в массиве, и указатель перемещается на следующий слот каждую единицу времени. Очередь задержки слота, на который указывает указатель, в котором запускаются все отложенные задачи.

Добавьте новую задачу задержки на колесо времени, разделите ее время задержки на остаточное значение, полученное в единицу времени, начните с текущего указателя и переместите слот, соответствующий количеству остаточных значений, который является слотом, в который отложенная задача размещен.

На основе такой структуры данных временная сложность вставки отложенной задачи падает до O(1). Когда указатель указывает на слот, запускаются все отложенные задачи в очереди отложенных задач, подключенной к слоту.

Задержка стрельбы и выполнения задач не должна влиять на точность синхронизации обратного движения указателя. Поэтому в общем случае поток, используемый для перемещения указателя, отвечает только за запуск задачи, а выполнение задачи завершается другими потоками. Например, очередь отложенных задач в слоте может быть помещена в дополнительный пул потоков для выполнения, а затем в слоте может быть создана пустая новая очередь отложенных задач для добавления последующих задач.

Поддержка большего времени задержки вне допустимого диапазона

В «Основах» мы анализируем базовую структуру колеса времени. Однако в то время мы предполагали, что время вставки отложенной задачи не будет превышать длину колеса времени, то есть время задержки задач в очереди отложенных задач на каждом слоте одинаково.

В этом случае для поддержки отложенных задач с большим промежутком времени либо увеличьте количество слотов в колесе времени, либо уменьшите точность колеса времени, то есть единицу времени, представленную каждым слотом. Точность колеса времени, очевидно, является жестким требованием бизнеса, поэтому количество слотов можно только увеличить. Предполагая, что требуемая точность составляет 1 секунду, чтобы иметь возможность поддерживать отложенную задачу с временем задержки 1 день, количество слотов в колесе времени должно быть 60 × 60 × 24 = 86400. Это требует большего потребления памяти. Очевидно, что простое увеличение количества слотов не является хорошим решением.

В статье представлены две схемы расширения для поддержки отложенных задач с большим размахом.

Вариант 1. Различные раунды отложенных задач сосуществуют в одной и той же очереди отложенных задач.

В этой схеме алгоритм вводит понятие «раунд», и раундом является частное, полученное путем деления времени задержки отложенной задачи на продолжительность раунда. Остаток, полученный путем деления времени задержки отложенной задачи на длину колеса времени, представляет собой смещение слота, которое необходимо вставить.

При вставке отложенной задачи сначала вычисляются смещение раунда и слота, а слот, в который вставляется отложенная задача, определяется по смещению слота. Когда указатель указывает на определенный слот, происходит обход очереди отложенных задач, на которую указывает слот, и запускаются все отложенные задачи, раунд которых равен 0, а оставшиеся задачи ждут следующего цикла.

Путем введения раундов можно поддерживать отложенные задачи с бесконечными временными горизонтами на ограниченных слотах. Но хотя временная сложность вставки задачи по-прежнему составляет O(1), когда запускается отложенная задача, необходимо просмотреть очередь отложенных задач, чтобы убедиться, что ее раунд равен 0. Временная сложность при запуске задачи возросла до O(n).

В этом случае можно использовать еще один вариант детали, который заключается в сортировке очереди отложенных задач в соответствии с раундом, например, с помощью небольшой верхней кучи для сортировки очереди отложенных задач. Таким образом, когда указатель указывает на слот для запуска отложенной задачи, необходимо только непрерывно вынимать задачу из головы очереди для проверки цикла, и как только раунд задачи не равен 0, его можно остановить. . Временная сложность запуска задачи снижена до O(1). Соответственно, поскольку очередь сортируется, когда задача вставляется, в дополнение к поиску слота для вставки также необходимо определить место вставки в очереди. Временная сложность вставки изменяется на O(1) и O(Log2n) , где n — длина очереди задач с задержкой в ​​слоте.

Вариант 2: Многоуровневое колесо времени

Посмотрите на дизайн часов, есть секундная стрелка, минутная стрелка, часовая стрелка. Подобно секундной стрелке и минутной стрелке, хотя имеется 60 сеток, соответствующие сетки представляют разные промежутки времени. Ссылаясь на эту идею, мы можем объявить несколько временных колес разных уровней, а временной интервал слота каждого временного колеса будет общим временным диапазоном его вторичного временного колеса.

Когда указатель колеса времени нижнего уровня завершает полный круг, соответствующее ему колесо времени высокого уровня перемещается на один слот соответственно. А задачи в слоте, на который указывает указатель колеса времени высокого уровня, рассчитываются в соответствии со временем задержки и помещаются обратно в разные слоты колеса времени нижнего уровня. Таким образом, гарантируется, что задачи в очереди отложенных задач каждого слота в каждом колесе времени имеют время задержки с одинаковой точностью времени.

Взяв в качестве примера колесо времени с точностью до 1 секунды и диапазоном времени 1 день, можно сконструировать трехуровневое колесо времени: колесо времени второго уровня имеет 60 слотов, а время каждого слота составляет 1 секунду. ; колесо времени на минутном уровне имеет 60 слотов Есть 24 слота, и время каждого слота составляет 60 секунд; колесо времени на часовом уровне имеет 24 слота, и время каждого слота составляет 60 минут. Когда колесо времени второго уровня отрабатывает 60 секунд, указатель колеса времени второго уровня снова указывает на слот с нижним индексом 0, а указатель колеса времени минутного уровня перемещается назад на один слот и задерживает задача на слоте, все вынимается и пересчитывается и помещается в колесо времени второго уровня.

Всего для поддержки необходимо всего 60 + 60 + 24 = 144 слота. По сравнению с упомянутым выше одноступенчатым колесом времени, для которого требуется 86 400 слотов, он экономит значительный объем памяти.

Существует два распространенных подхода к иерархическим колесам времени:

  • Фиксированный временной диапазон: Количество временных колес и количество слотов временных колес на разных уровнях задаются входными параметрами метода построения, что означает, что временной диапазон, который может поддерживать колесо времени в целом, определяется, когда построен метод построения.
  • Нефиксированный временной диапазон: определите количество слотов в колесе времени и время слота наименьшего временного колеса. Когда время вставленной отложенной задачи превышает диапазон колеса времени, динамически генерируется колесо времени более высокого уровня. Поскольку колесо времени генерируется во время выполнения и рассчитывается в соответствии со временем задержки задачи, когда существующее колесо времени не соответствует требованиям его диапазона времени задержки, колесо времени высокого уровня генерируется динамически, поэтому общее время поддерживаемый диапазон не имеет верхнего предела.

Реализация колеса времени Нетти

Основная идея алгоритма колеса времени состоит в том, чтобы уменьшить временную сложность вновь добавленных задач задержки до O(1) посредством зацикливания массивов и перемещения указателя, но с точки зрения конкретной реализации, включая то, как справляться с задачами задержки с помощью большие промежутки времени, различные реализации будут иметь некоторые изменения в деталях. Давайте возьмем реализацию колеса времени в Netty в качестве примера для анализа кода.

Определение интерфейса

Реализация Netty настраивает интерфейс тайм-аутаio.netty.util.Timer, метод следующий

public interface Timer
{
    //新增一个延时任务,入参为定时任务TimerTask,和对应的延迟时间
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    //停止时间轮的运行,并且返回所有未被触发的延时任务
    Set < Timeout > stop();
}
public interface Timeout
{
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}

TimeoutИнтерфейс — это инкапсуляция отложенной задачи, и его интерфейсный метод указывает, что его реализация должна поддерживать состояние отложенной задачи. Позже мы сможем увидеть это более легко, когда проанализируем внутренний код его реализации.

TimerИнтерфейс имеет уникальную реализациюHashedWheelTimer. Сначала посмотрите на его метод построения, как показано ниже.

Построить круговой массив

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
    //省略代码,省略参数非空检查内容。
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    //省略代码,省略槽位时间范围检查,避免溢出以及小于 1 毫秒。
    workerThread = threadFactory.newThread(worker);
    //省略代码,省略资源泄漏追踪设置以及时间轮实例个数检查
}

Первый – это методcreateWheel, основная структура данных, используемая для создания колеса времени, массива петель. Давайте посмотрим на его содержание метода

private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
    //省略代码,确认 ticksPerWheel 处于正确的区间
    //将 ticksPerWheel 规范化为 2 的次方幂大小。
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for(int i = 0; i < wheel.length; i++)
    {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}

Массивы длин в степени двойки облегчают вычисления частного и остатка.

HashedWheelBucketВнутренне хранитсяHashedWheelTimeoutЭто двусвязный список, состоящий из узлов, в котором хранятся головной узел и хвостовой узел связанного списка, что удобно для извлечения и вставки задач.

Добавлены отложенные задачи

методHashedWheelTimer#newTimeoutИспользуется для добавления задач задержки, давайте посмотрим на код ниже

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
    //省略代码,用于参数检查
    start();
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if(delay > 0 && deadline < 0)
    {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

Видно, что при добавлении задачи задача не заносится непосредственно в круговой массив, а сначала ставится в очередь, то есть атрибутtimeouts, очередь представляет собой очередь типа MPSC, этот режим в основном используется для повышения параллельной производительности, поскольку в этой очереди есть только потокиworkerThreadБудет выполнена операция извлечения задачи.

Поток вызывается в конструкторе вызовомworkerThread = threadFactory.newThread(worker)был создан. Но поток не выполняется сразу после созданияstartметод, время его запуска — это когда задача задержки добавляется впервые в этом временном раунде, то есть время в этом методеstartсодержание метода. Ниже его код

public void start()
{
    switch(WORKER_STATE_UPDATER.get(this))
    {
        case WORKER_STATE_INIT:
            if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
            {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    while(startTime == 0)
    {
        try
        {
            startTimeInitialized.await();
        }
        catch(InterruptedException ignore)
        {
            // Ignore - it will be ready very soon.
        }
    }
}

Метод четко разделен на две части, первая частьSwitchБлок метода, гарантирующий, что только один поток может выполнять операции CAS над переменными состояния.workerThread.start()метод для запуска рабочих потоков и избегания исключений параллелизма. Вторая часть блокирует ожидание, черезCountDownLatchпеременная типаstartTimeInitializedВыполнить блокировку ожидания ожидания рабочих потоковworkerThreadПо-настоящему погрузитесь в работу.

отnewTimeoutС точки зрения метода задача задержки вставки сначала ставится в очередь.При анализе структуры данных ранее было сказано, что запуск задачи выполняется, когда указатель указывает на определенный слот в колесе времени, поэтому должна быть необходимость ставить задачу в очередь.Задержки задач ставятся в работу массива колеса времени. Это действие, очевидно, вызваноworkerThreadРабочий поток для завершения. Давайте взглянем на конкретное содержание кода этого потока.

Рабочий поток workerThread

Рабочие потоки основаны наHashedWheelTimer.Workerэто достигаетсяRunnableКласс интерфейса работает, тогда посмотрим на его сопряжениеrunКод реализации метода выглядит следующим образом

public void run()
{
    {//代码块①
        startTime = System.nanoTime();
        if(startTime == 0)
        {
            //使用startTime==0 作为线程进入工作状态模式标识,因此这里重新赋值为1
            startTime = 1;
        }
        //通知外部初始化工作线程的线程,工作线程已经启动完毕
        startTimeInitialized.countDown();
    }
    {//代码块②
        do {
            final long deadline = waitForNextTick();
            if(deadline > 0)
            {
                int idx = (int)(tick & mask);
                processCancelledTasks();
                HashedWheelBucket bucket = wheel[idx];
                transferTimeoutsToBuckets();
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    }
    {//代码块③
        for(HashedWheelBucket bucket: wheel)
        {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for(;;)
        {
            HashedWheelTimeout timeout = timeouts.poll();
            if(timeout == null)
            {
                break;
            }
            if(!timeout.isCancelled())
            {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }
}

Запуск и подготовка потока

Для удобства чтения здесьrunСодержание метода разделено на три блока кода. Первый взглядКодовый блок ①. через системный вызовSystem.nanoTimeдля времени началаstartTimeУстановите начальное значение.Эта переменная представляет базовое время колеса времени и используется для последующих расчетов относительного времени. После выполнения задания пройтиstartTimeInitializedПеременные уведомляют внешние ожидающие потоки.

Указатели дисков и триггеры задач

ПосмотримКодовый блок ②. Это основная рабочая часть, все вwhileВ цикле убедитесь, что рабочий поток работает только тогда, когда колесо времени не остановлено. Первый взгляд на методwaitForNextTick, в колесе времени указатель перемещается один раз, называемыйtick, этот метод следует использовать внутренне, чтобы дождаться перехода указателя к следующемуtick, посмотрите на конкретный код, как показано ниже

private long waitForNextTick()
{
    long deadline = tickDuration * (tick + 1);
    for(;;)
    {
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        if(sleepTimeMs <= 0)
        {
            if(currentTime == Long.MIN_VALUE)
            {
                return -Long.MAX_VALUE;
            }
            else
            {
                return currentTime;
            }
        }
        if(PlatformDependent.isWindows())
        {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }
        try
        {
            Thread.sleep(sleepTimeMs);
        }
        catch(InterruptedException ignored)
        {
            if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
            {
                return Long.MIN_VALUE;
            }
        }
    }
}

Идея всего метода очень проста.Как упоминалось ранее, каждый раз, когда колесо времени перемещает указатель, это означаетtick. здесьtickЭто можно увидеть как количество перемещений указателя. Поскольку временной диапазон слота фиксирован, можно просто рассчитать, что указатель перемещается на следующий слот,теориявремя, которое должно было пройти наlong deadline = tickDuration * (tick + 1). После этого начинается отсчет от колеса времени к текущему,действительныйпрошедшее время, т.long currentTime = System.nanoTime() - startTime. Разница между ними заключается во времени, в течение которого поток должен спать.

Если разница меньше 0, это означает, что фактическое прошедшее время превышает теоретическое время.В это время оно превысило диапазон, который должен спать, и метод должен немедленно вернуться. Поскольку колесо времени может быть остановлено во время выполнения этого метода, для выражения этого события используется специальное значение, то естьLong.MIN_VALUE, вот почемуcurrentTimeПричина, по которой следует избегать этого значения.

Еще одно замечание,Thread.sleepРеализация метода основана на проверке прерывания, предоставляемой операционной системой, то есть операционная система проверяет, есть ли поток, которому необходимо проснуться, и предоставлять ресурсы ЦП при каждом прерывании. По умолчанию в Linux интервал прерывания составляет 1 мс, а в Windows — 10 мс или 15 мс, в зависимости от аппаратного распознавания.

Если это на платформе Windows, при вызове методаThread.sleepЕсли входящий параметр не является целым числом, кратным 10, системный метод будет вызываться внутри.timeBeginPeriod()иtimeEndPeriod()чтобы изменить период прерывания на 1 миллисекунду и снова установить его на значение по умолчанию после завершения сна. Целью этого является обеспечение точности времени сна. Однако на платформе Windows частые вызовы для изменения цикла прерывания приведут к неправильной работе часов Windows, и в большинстве случаев производительность приводит к ускорению часов. Это приведет, скажем, к попытке заснуть в течение 10 секунд, но спящий только в течение 9 секунд. Так вот, поsleepTimeMs = sleepTimeMs / 10 * 10обещатьsleepTimeMsЭто целое число, кратное 10, что позволяет избежать этой ошибки в Windows.

когда методwaitForNextTickПосле возврата, а возвращаемое значение является положительным числом, это означает, что текущийtickОжидание сна завершено, и можно выполнить обработку триггера отложенной задачи. пройти черезint idx = (int)(tick & mask)Вызывается для определения индекса слота следующей запущенной отложенной задачи в массиве циклов. Перед обработкой триггерной задачи сначала удалите отмененную отложенную задачу из очереди отложенных задач, на которую указывает слот. каждый звонокHashedWheelTimer#newTimeoutПри добавлении отложенной задачи возвращаетсяTimeoutобъект, который может бытьcancleспособ отменить эту отложенную задачу. При выполнении действия отмены оно не будет удалено непосредственно из очереди задержки, а будет помещено в очередь отмены, т. е.HashedWheelTimer.cancelledTimeoutsАтрибуты. Перед подготовкой к обходу очереди отложенных задач в слоте передайте методprocessCancelledTasksдля обхода очереди отмены и удаления отложенных задач из очередей отложенных задач в соответствующих слотах. Преимущество использования этого метода заключается в том, что только один поток удалит отложенную задачу, что позволяет избежать параллельных помех, вызванных многопоточностью, и снижает сложность разработки.

После обработки отмененной отложенной задачи вызовите методtransferTimeoutsToBucketsдобавит отложенную очередь задачHashedWheelTimer.timeoutsОтложенные задачи добавляются в слоты с соответствующим временем задержки. Код метода очень прост, просто непрерывно зацикливайте отtimeoutsВыньте задачу и рассчитайте частное и остаток от ее времени задержки и диапазона времени раунда, а результаты - это индексы раунда и слота соответственно. Задача добавляется в очередь отложенных задач, соответствующую слоту в соответствии с индексом слота.

Здесь вы можете увидеть параллельный дизайн структуры колеса времени, выполненный автором Netty.Новая задача - добавить элементы в очередь MPSC. Очередь отложенных задач на слоте может добавлять и удалять только поток самого колеса времени, который разработан в режиме SPSC. Первое предназначено для повышения производительности в условиях параллелизма без блокировок, а второе — для снижения сложности проектирования за счет ограничений.

transferTimeoutsToBucketsЭтот метод будет передавать не более 100 000 отложенных задач в соответствующий слот за раз, чтобы избежать голодания, вызванного добавлением задач во внешний цикл. После выполнения метода достигается триггерная обработка отложенной задачи на слоте, то есть методHashedWheelBucket#expireTimeoutsфункции, логика внутри метода также очень проста. Пройдите по очереди, уменьшая 1, если очередь задачи задержки не равна 0. В противном случае срабатывает метод выполнения задачи, т.е.HashedWheelTimeout#expire. Метод по-прежнему использует CAS для внутреннего обновления состояния, чтобы избежать конкуренции между запуском и отменой метода. Как видно из реализации этого метода, Netty использует раунды для поддержки времени задержки, выходящего за рамки временного раунда. Реализация многоуровневого колеса времени более сложна, чем реализация концепции раунда.Учитывая, что в сетевых приложениях ввода-вывода мало сценариев, выходящих за рамки колеса времени, относительно легко использовать метод раунда для поддержки большее время реализованный план.

Когда все отложенные задачи, которые должны быть запущены, запущены, передайтеtickДобавьте 1, чтобы указать, что указатель перемещается в следующий слот.

колесо времени останавливается

внешний поток, вызываяHashedWheelTimer#stopспособ остановить колесо времени, способ остановки очень прост, то есть изменить атрибут состояния колеса времени с помощью вызовов CAS. пока вКодовый блок ②по петле в каждомtickЭтот бит состояния проверяется. Содержимое кодового блока ③ очень простое.Он проходит через все слоты и проходит через очередь отложенных задач слота.Все задачи, которые не достигли времени задержки и не были отменены, помещаются в набор, и, наконец, этот набор возвращается. В этой коллекции хранятся все отложенные задачи, которые не удалось выполнить.

Резюме мыслей

В сценариях, имеющих дело с большим количеством отложенных задач, колеса времени являются очень эффективным алгоритмом и структурой данных. Нетти внесла некоторые подробные изменения в реализацию колеса времени при добавлении задач, истечении срока действия задач и удалении задач. На самом деле, существует несколько реализаций колеса времени в разном промежуточном программном обеспечении, и каждая из них имеет свои отличия, но в основе лежит концепция кругового массива и истечения срока действия слота. Различные изменения деталей имеют свои собственные подходящие сценарии и соображения.