Коротко об алгоритме колеса времени в Кафке

Java Kafka
Коротко об алгоритме колеса времени в Кафке

Ноль, определение колеса времени

Просто скажите о колесе времени, это эффективная очередь задержки или таймер. На самом деле объяснений алгоритму колеса времени в интернете много, и определения тоже очень полные, вот цитата.Блог Чжу СяосиОпределения, встречающиеся в:

Ссылаясь на рисунок ниже, TimingWheel в Kafka представляет собой циклическую очередь, в которой хранятся запланированные задачи.Нижний уровень реализован с использованием массива.Каждый элемент в массиве может хранить TimerTaskList. TimerTaskList представляет собой круговой двусвязный список, каждый элемент которого представляет собой элемент задачи таймера (TimerTaskEntry), который инкапсулирует задачу таймера реального времени TimerTask.

时间轮

Если вы понимаете приведенное выше определение, вам не нужно читать дальше. Но если вы так же смущены, как и я, когда видите это впервые, и у вас есть много вопросов, то эта запись в блоге поможет вам лучше понять колесо времени и даже понять алгоритм колеса времени.

Если вам интересно, вы можете пойти посмотреть другие таймерыВы действительно понимаете задержанные очереди?. Блогер считает, что самое большое преимущество таймера колеса времени:

    1. Добавление и удаление задач имеют сложность уровня O(1);
    1. Не отнимет много ресурсов;
    1. Все, что требуется, — это один поток, чтобы колесо времени заработало.

Мы будем анализировать ход колеса времени слой за слоем:

1. Зачем использовать циклическую очередь

Предположим, теперь у нас есть большой массив, предназначенный для хранения отложенных задач. Он имеет миллисекундную точность! Таким образом, наша отложенная задача на самом деле должна просто преобразовать запрограммированное время в миллисекунды, а затем сохранить в ней отложенную задачу:

Например, текущее время 2018/10/24 19:43:45, тогда задача хранится в Task[1540381425000], а значением является содержимое запланированной задачи.


private Task[很长] tasks;

public List<Task> getTaskList(long timestamp) {
	return task.get(timestamp)
}

// 假装这里真的能一毫秒一个循环
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).后台执行()
		Thread.sleep(1);
	}
}

Если длина массива достигает миллиардов, мы действительно можем это сделать. Что, если бы точность была снижена до второго уровня? Нам также нужен массив длиной в десятки миллиардов.

Не говоря уже о том, достаточно ли памяти, очевидно, что иметь такой большой объем памяти — пустая трата времени.

Конечно, если мы сами напишем карту и убедимся, что она не имеет хеш-конфликтов, это вполне осуществимо. (Я не уверен, правильно ли я думаю, пожалуйста, укажите, если это неправильно)


/* 一个精度为秒级的延时任务管理类 */
private Map<Long, Task> taskMap;

public List<Task> getTaskList(long timestamp) {
	return taskMap.get(timestamp - timestamp % 1000)
}

// 新增一个任务
public void addTask(long timestamp, Task task) {
	List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
		if (taskList == null){
			taskList = new ArrayList();
		}
	taskList.add(task);
}

// 假装这里真的能一秒一个循环
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).后台执行()
		Thread.sleep(1000);
	}
}

По сути, колесо времени — это структура данных без конфликта хэшей.

Помимо других сомнений, давайте посмотрим на часы на запястье (если вы не найдете часы или вообразите их), всегда ли они могут быть представлены нашим циферблатом, независимо от того, какое время сейчас (без учета точности). )

Возьмите секундомер, он всегда падает между 0 и 59 секундами, и он начинает все сначала каждый раз, когда он проходит круг.

Смоделируйте наш секундомер с помощью псевдокода:

private Bucket[60] buckets;// 表示60秒

public void addTask(long timestamp, Task task) {
	Bucket bucket = buckets[timestamp / 1000 % 60];
	bucket.add(task);
}

public Bucket getBucket(long timestamp) {
	return buckets[timestamp / 1000 % 60];
}

// 假装这里真的能一秒一个循环
public void run(){
	while (true){
		getBucket(System.currentTimeMillis()).后台执行()
		Thread.sleep(1000);
	}
}

Таким образом, наше время всегда может приходиться на любое ведро от 0 до 59, так же как наши секунды всегда приходятся на шкалу от 0 до 59, чтоКруговая очередь колеса времени.

Во-вторых, ограниченное время

Но осторожные друзья также столкнутся с такой проблемой: если он может только указать, как хранить и извлекать запланированные задачи в течение 60 секунд, не слишком ли он ограничен?Что делать, если я хочу присоединиться к отложенной задаче, которая выполняется на час позже?

На самом деле, вы все еще можете взглянуть на часы.Для часов с тремя стрелками (обычные часы) максимум может составлять 12 часов.Если диапазон превышает 12 часов, время будет неоднозначным. Что, если мы добавим больше указателей? Например, у нас есть секундная стрелка, минутная стрелка, часовая стрелка, утренняя и дневная стрелки, стрелка неба, стрелка луны, стрелка года... Разве это не означает долгий-долгий период времени? Кроме того, он не должен занимать много памяти.

Например, секундная стрелка может быть представлена ​​массивом длиной 60, минутная стрелка также может быть представлена ​​массивом длиной 60, а часовая стрелка может быть представлена ​​массивом длиной 24. Тогда для представления всех времен суток нужны всего три массива.


сделай это, мы называем эту структуру данных колесом времени, а tickMs представляет тик, например, одну секунду, как упоминалось выше. WheelSize указывает, сколько масштабов в круге, то есть 60, упомянутых выше. interval указывает, сколько времени может представлять круг, то есть tickMs * wheelSize = 60 секунд.

overflowWheel представляет колесо времени предыдущего слоя, например, для секунд, overflowWheel представляет минуты и т. д.

public class TimeWheel {

    /** 一个时间槽的时间 */
    private long tickMs;

    /** 时间轮大小 */
    private int wheelSize;

    /** 时间跨度 */
    private long interval;

    /** 槽 */
    private Bucket[] buckets;

    /** 时间轮指针 */
    private long currentTimestamp;

    /** 上层时间轮 */
    private volatile TimeWheel overflowWheel;

    public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
        this.currentTimestamp = currentTimestamp;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new Bucket[wheelSize];
        this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);

        for (int i = 0; i < wheelSize; i++) {
            buckets[i] = new Bucket();
        }
    }
}

Добавлять задачи в колесо времени очень просто, для каждого колеса времени, такого как второе колесо времени и иерархическое колесо времени, есть свой слот истечения срока действия. То есть когда delayMs

При добавлении отложенной задачи всего возможны такие ситуации:

####1. Время истекает

  • 1) Например, есть задача, которую нужно выполнить в 16:29:07.Из колеса времени второго уровня, когда наше текущее время достигает 16:29:06, это означает, что срок действия задачи истек. Поскольку его задержкаMs = 1000 мс, это меньше, чем тикMs (1000 мс) нашего колеса времени второго уровня.
    1. Например, есть задача, которую нужно выполнить в 16:41:25, с точки зрения иерархического колеса времени, когда наше текущее время достигает 16:41 (У градуированного колеса времени нет секундной стрелки! Его минимальная точность - минуты (обязательно поймите это)), это означает, что срок действия задачи истек, потому что ее delayMs = 25000 мс, что меньше, чем tickMs (60000 мс) нашего иерархического колеса времени.

2. Время не истекло, а delayMs меньше интервала.

Для колеса времени второго уровня, если время задержки меньше 60с, то второй слот точно можно найти и кинуть в него.

3. Время не истекло, а delayMs больше интервала.

Для чудесного колеса времени время задержки больше или равно 60 с.В это время необходимо использовать мощность верхнего колеса времени.Очень простая реализация кода состоит в том, чтобы получить верхнее колесо времени, а затем бросить это похоже на рекурсию.


Например, временная задача с задержкой в ​​один год будет продолжать создавать колесо времени более высокого уровня в этой рекурсии до тех пор, пока не найдет колесо времени, которое удовлетворяет задержке меньше интервала.

Чтобы не усложнять код, масштаб каждого слоя нашего колеса времени одинаков, то есть колесо времени второго уровня представляет 60 секунд, верхний слой представляет 60 минут, верхний слой представляет 60 часов, и верхний слой представляет 60 60 часов, а затем верхний слой означает 60 60 60 часов = 216000 часов.

То есть, если тикМс (точность) нижнего колеса времени установлен на 1000 мс. WheelSize установлен на 60.Тогда необходимы только 5 слоев временных колес, а временной отрезок, который можно представить, составляет целых 24 года (216 000 часов)..

    /**
     * 添加任务到某个时间轮
     */
    public boolean addTask(TimedTask timedTask) {
        long expireTimestamp = timedTask.getExpireTimestamp();
        long delayMs = expireTimestamp - currentTimestamp;
        if (delayMs < tickMs) {// 到期了
            return false;
        } else {

            // 扔进当前时间轮的某个槽中,只有时间【大于某个槽】,才会放进去
            if (delayMs < interval) {
                int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);

                Bucket bucket = buckets[bucketIndex];
                bucket.addTask(timedTask);
            } else {
			// 当maybeInThisBucket大于等于wheelSize时,需要将它扔到上一层的时间轮
                TimeWheel timeWheel = getOverflowWheel();
                timeWheel.addTask(timedTask);
            }
        }
        return true;
    }


   /**
     * 获取或创建一个上层时间轮
     */
	private TimeWheel getOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
                }
            }
        }
        return overflowWheel;
    }

Конечно, нашему колесу времени также нужен механизм продвижения указателя Мы же не можем позволить времени вечно оставаться в текущем состоянии, верно? При продвижении это похоже на рекурсию в то же время, чтобы продвигать колесо времени предыдущего слоя.

Примечание: следует подчеркнуть, что наше колесо времени больше похоже на электронные часы, в нем нет промежуточного состояния времени, то есть необходимо хорошо понимать понятие точности. Например, для колеса времени второго уровня его точность может быть гарантирована только до 1 секунды, и если оно меньше 1 секунды, оно будет считаться просроченным.

Для колеса оценки времени его точность может быть гарантирована только до 1 балла, и если она меньше 1 балла, она будет считаться просроченной.

    /**
     * 尝试推进一下指针
     */
    public void advanceClock(long timestamp) {
        if (timestamp >= currentTimestamp + tickMs) {
            currentTimestamp = timestamp - (timestamp % tickMs);

            if (overflowWheel != null) {
                this.getOverflowWheel()
                    .advanceClock(timestamp);
            }
        }
    }

3. Для колеса времени высокого уровня точность становится все более и более неточной, окажет ли это влияние?

Как упоминалось выше, колесо времени оценивания имеет точность всего в минутах.Невозможно одновременно выполнить запланированное задание с задержкой в ​​1 секунду и запланированное задание с задержкой в ​​59 секунд, верно?

Студенты, у которых есть этот вопрос, очень хороши! На самом деле это очень легко решить, просто снова войдите в колесо времени. Например, для колеса времени минутного уровня истекли delayM в 1 секунду и delayM в 59. Можем ли мы вынуть их и бросить в нижнее колесо времени?

1 секунда будет брошена в следующий слот выполнения колеса времени второго уровня, а 59 секунд будет брошена в последние 59 слотов времени колеса времени второго уровня.

Внимательные студенты обнаружат, что наш метод добавления задачи возвращает логическое значение.

public boolean addTask(TimedTask timedTask)

Вернитесь назад и хорошенько посмотрите.Если он не сможет добавить к нижнему колесу времени (мы можем напрямую управлять только нижним колесом времени, но не верхним колесом времени), вернется ли он напрямую во flash?Для задач, которые не могут быть повторно введены, мы можем выполнить их напрямую.


    /**
     * 将任务添加到时间轮
     */
    public void addOrSubmitTask(TimedTask timedTask) {
        if (!timeWheel.addTask(timedTask)) {
            taskExecutor.submit(timedTask.getTask());
        }
    }
	

4. Как узнать, что срок действия задачи истек?

Помните, мы хранили задачи в слотах? Например, в колесе времени второго уровня 60 слотов, поэтому всего 60 слотов. Если колесо времени состоит из двух слоев, то всего 120 слотов. Мы просто бросаем слот в задержанную очередь.

Мы можем опросить просроченный слот из delayedQueue. (Во всем предыдущем коде, для простоты, понятие DelayQueue не вводилось, так что переходить на него не надо, нет. Блогер считает...Увидев это, вы должны понять смысл этой DelayQueue.)

На самом деле, проще говоря, на самом деле можно использовать DelayQueue для задач с таймером, но как только количество задач увеличится, достигнув миллионов или десятков миллионов, добавление и удаление этой delayQueue будет очень медленным.

** Во-первых, delayQueue, ориентированная на слоты**

Для колеса времени нужно всего лишь закидывать в delayQueue разные слоты.Например, наши временные задачи имеют разную длину.Самый длинный промежуток 24 года, а в этой delayQueue всего 300 элементов.

** Во-вторых, разберитесь с просроченными слотами**

По истечении этого слота, то есть после того, как мы опросим его из delayQueue, нам нужно только один раз зациклить все задачи в слоте и добавить его обратно в новый слот (если добавление не удалось, оно будет выполнено напрямую).

   /**
     * 推进一下时间轮的指针,并且将delayQueue中的任务取出来再重新扔进去
     */
    public void advanceClock(long timeout) {
        try {
            Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (bucket != null) {
                timeWheel.advanceClock(bucket.getExpire());
                bucket.flush(this::addTask);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Полное колесо времени GitHub на самом деле является упрощенной версией колеса времени Kafka, которое наполовину копируется, а наполовину принадлежит вам.В Timer#main моделируется шесть миллионов простых задач с задержкой, и эффективность их выполнения очень высока~