Как обычная мера стабильности в микросервисах, на собеседованиях часто нужно задавать вопрос об ограничении тока.Мне часто нравится спрашивать, что вы знаете об алгоритмах ограничения тока во время собеседований? Вы смотрели исходный код? Каков принцип реализации?
В первой части сначала рассказывается о текущем алгоритме ограничения и, наконец, о принципе реализации исходного кода.
Алгоритм ограничения тока
Текущие алгоритмы ограничения можно условно разделить на четыре категории: счетчики с фиксированным окном, счетчики с скользящим окном, дырявые ведра (также называемые воронками, англ. Leaky Buckets) и ведра с токенами (англ. Token Buckets).
фиксированное окно
Фиксированное окно, по сравнению с другими алгоритмами ограничения тока, это должно быть самым простым.
Он просто подсчитывает количество запросов в течение фиксированного временного окна, и если оно превышает пороговое значение количества запросов, оно будет напрямую отброшено.
Преимущества и недостатки этого простого алгоритма ограничения тока очевидны. Плюсы простые, минусы например.
Например, желтая область на рисунке ниже представляет собой фиксированное временное окно, диапазон времени по умолчанию — 60 с, а текущее предельное число — 100.
Как показано в скобках на рисунке, в первый период времени трафика не было, а в следующие 30 секунд поступило 100 запросов, в это время, поскольку текущий порог лимита не был превышен, все запросы прошли, а затем следующее окно прошло секунд за 20. 100 запросов.
Таким образом, замаскированный, это эквивалентно передаче 200 запросов в течение 40 секунд этой группы, что превышает наш текущий предельный порог.
Ограничение
раздвижное окно
Чтобы оптимизировать эту проблему, у нас есть алгоритм скользящего окна, Как следует из названия, скользящее окно — это временное окно, которое продолжает двигаться во времени.
Скользящее окно делит фиксированное временное окно на N маленьких окон, а затем считает каждое маленькое окно отдельно.Сумма всех запросов малых окон не может превышать установленный нами текущий предельный порог.
В качестве примера на следующем рисунке предположим, что наше окно разделено на 3 маленьких окна, и все маленькие окна имеют значение 20. Также на основе приведенного выше примера, когда 100 запросов приходят в третьи 20-е, они могут пройти.
Затем временное окно скользит, и в следующем запросе на 20 секунд поступает еще 100 запросов. В это время количество запросов в диапазоне 60 с нашего скользящего окна должно превышать 100, поэтому запрос отклоняется.
Дырявое ведро
Алгоритм дырявого ведра, как следует из его названия, представляет собой дырявое ведро. Независимо от того, сколько запросов было сделано, в конечном итоге он будет вытекать с постоянной скоростью исходящего трафика. Если запрошенный трафик превышает размер дырявого ведра, избыточный трафик будет быть отброшенным. .
То есть скорость притока потока переменная, а скорость оттока постоянна.
Это похоже на идею MQ о сглаживании пиков и заполнении впадин.В условиях внезапных всплесков трафика алгоритм дырявого ведра может обеспечить равномерное создание очередей и ограничение тока с фиксированной скоростью.
Преимуществом алгоритма дырявого ведра является равномерная скорость, что является одновременно и преимуществом, и недостатком.Многие говорят, что дырявое ведро не справляется с внезапными потоками трафика.Это утверждение неверно.
Дырявое ведро изначально предназначалось для обработки прерывистого всплесков трафика.Трафик внезапно увеличился, и тогда система не смогла его обработать.Его можно было обработать, когда он простаивал, что предотвратило сбой системы из-за всплеска трафика и защитило стабильность системы.
Однако, если подумать по-другому, на самом деле эти резкие увеличения трафика совершенно не давят на систему, вы все равно стоите в очереди медленно и с постоянной скоростью, что на самом деле является пустой тратой производительности системы.
Следовательно, для такого сценария алгоритм ведра с токенами имеет больше преимуществ, чем дырявое ведро.
ведро с жетонами ведро с жетонами
Алгоритм Token Bucket означает, что система выбрасывает токены в Token Bucket с определенной скоростью.Когда приходит запрос, он переходит в Token Bucket для подачи заявки на токен.Если токен можно получить, то запрос может быть обычным. выполняется, в противном случае отбрасывается.
Текущие алгоритмы корзины токенов, такие как реализованные Guava и Sentinel, имеют метод холодного запуска / прогрева.Чтобы избежать отключения системы при скачках трафика, алгоритм корзины токенов запустится через определенный период времени. с начала.Холодный запуск, с увеличением трафика система будет динамически регулировать скорость генерации токенов в зависимости от размера трафика и, наконец, до тех пор, пока запрос не достигнет порога системы.
Пример исходного кода
Возьмем в качестве примера Sentinel: Sentinel использует алгоритм скользящего окна для статистики, а затем также использует алгоритмы дырявого ведра и ведра с маркерами.
раздвижное окно
sentinel
Алгоритм скользящего окна используется для статистики, но его реализация немного отличается от той, что я нарисовал выше, на самом деле скользящее окно в дозоре более разумно для описания круга.
На начальном этапе создаются ноды, а затем слоты объединяются в цепочку, образуя модель цепочки ответственности.StatisticSlot использует скользящее окно для сбора статистики.FlowSlot – это логика реального ограничения тока, а также некоторый даунгрейд и защита системы меры, которые в конечном итоге образуют действующий ограничительный метод всего дозорного.
Вы только посмотрите на официальную картинку, этот круг выглядит отвратительно
Реализация скользящего окна в основном виднаLeapArray
Код по умолчанию определяет соответствующие параметры временного окна.
Для Sentinel окно фактически разделено навторойиминутаДва уровня, количество окон для секунд равно 2, а количество окон для минут равно окнам 60. Продолжительность каждого окна 1 с, а общий период времени 60 с, разделенный на 60 окон. Здесь мы используем минуты- статистика уровня.
public abstract class LeapArray<T> {
//窗口时间长度,毫秒数,默认1000ms
protected int windowLengthInMs;
//窗口数量,默认60
protected int sampleCount;
//毫秒时间周期,默认60*1000
protected int intervalInMs;
//秒级时间周期,默认60
private double intervalInSecond;
//时间窗口数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
Затем нам нужно увидеть, как он вычисляет текущее окно.На самом деле исходный код написан ясно, но если вы думаете об этом как о прямой линии в соответствии с предыдущим воображением, это, вероятно, нелегко понять.
Во-первых, относительно просто вычислить индекс индекса массива и время временного окна.Большая сложность должна заключаться в том, что третий момент заключается в том, что окно больше, чем старое.Давайте поговорим об этих ситуациях подробно.
- Временное окно в массиве пустое, значит, время пришло после нашего времени инициализации, в это время CAS обновляет новое окно, после чего возвращается новое окно.
- Второй случай, если время временного окна равно, то возвращайтесь напрямую, тут нечего сказать
- Третий случай более труден для понимания, вы можете обратиться к диаграммам двух временных линий, и это легче понять, первое временное окно завершается и достигает 1200, а затем начинает циклически повторяться круговое временное окно, и новая начальная позиция времени по-прежнему 1200, а затем время временного окна приближается к 1676. Если позиция B2 по-прежнему остается старым окном, это 600, поэтому нам нужно сбросить время предыдущего временного окна на текущее время.
- Последний общий случай вряд ли произойдёт, если часы не будут откатываться вот так.
Отсюда мы можем найти, что для каждогоWindowWrap
Временные окна все учитываются.В итоге результаты QPS статистики временных окон реально используются в следующих местах.Я не буду здесь вдаваться в подробности, просто знайте.
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int) (timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
public WindowWrap<T> currentWindow(long timeMillis) {
//当前时间如果小于0,返回空
if (timeMillis < 0) {
return null;
}
//计算时间窗口的索引
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//在窗口数组中获得窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 比如当前时间是888,根据计算得到的数组窗口位置是个空,所以直接创建一个新窗口就好了
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 这个更好了,刚好等于,直接返回就行
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* |_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* 这个要当成圆形理解就好了,之前如果是1200一个完整的圆形,然后继续从1200开始,如果现在时间是1676,落在在B2的位置,
* 窗口开始时间是1600,获取到的old时间其实会是600,所以肯定是过期了,直接重置窗口就可以了
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这个不太可能出现,嗯。。时钟回拨
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
дырявое ведро
дозор в основном основан наFlowSlot
управление потоком в управлении потоком, гдеRateLimiterController
Это реализация алгоритма дырявого ведра, эта реализация намного проще, чем другие, это следует понять после небольшого взгляда.
- Сначала рассчитайте время, затраченное на текущий запрос, амортизированное с точностью до 1 с, а затем рассчитайте расчетное время для этого запроса.
- Если оно меньше текущего времени, то текущее время является основным, и можно вернуться
- Наоборот, если оно превышает текущее время, оно будет поставлено в очередь в это время.При ожидании необходимо судить, превышает ли оно текущее максимальное время ожидания, и если превышает, то оно будет отброшено напрямую.
- Если оно не превышается, обновите последнее время прохождения, а затем сравните, истекло ли время ожидания снова, и сбросьте время, если оно истекло.В противном случае, если оно находится в пределах диапазона времени ожидания, подождите.
public class RateLimiterController implements TrafficShapingController {
//最大等待超时时间,默认500ms
private final int maxQueueingTimeMs;
//限流数量
private final double count;
//上一次的通过时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
//时间平摊到1s内的花费
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
//计算这一次请求预计的时间
long expectedTime = costTime + latestPassedTime.get();
//花费时间小于当前时间,pass,最后通过时间 = 当前时间
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
//预计通过的时间超过当前时间,要进行排队等待,重新获取一下,避免出现问题,差额就是需要等待的时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
//等待时间超过最大等待时间,丢弃
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
//反之,可以更新最后一次通过时间了
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
//更新后再判断,还是超过最大超时时间,那么就丢弃,时间重置
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
//在时间范围之内的话,就等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
ведро с жетонами
Наконец, есть ведро токенов, это не репликация реализации, а то, что вы найдете, когда посмотрите на исходный код. . . Реализация корзины токенов Sentinel основана на Guava, а код находится вWarmUpController
середина.
На самом деле, мы можем игнорировать различную логику вычислений этого алгоритма (потому что я тоже ее не понимаю...), но мы можем прояснить ее в процессе.
См. комментарии к нескольким основным параметрам.Логика расчета в методе построения не имеет значения, как она рассчитывается на данный момент (я не понимаю, но это не влияет на наше понимание).Главное, чтобы увидетьcanPass
как это делается.
- Получить QPS текущего окна и предыдущего окна
- Заполнение токена, то есть выбрасывание токена в ведро, а потом мы сначала смотрим на логику заполнения токена
public class WarmUpController implements TrafficShapingController {
//限流QPS
protected double count;
//冷启动系数,默认=3
private int coldFactor;
//警戒的令牌数
protected int warningToken = 0;
//最大令牌数
private int maxToken;
//斜率,产生令牌的速度
protected double slope;
//存储的令牌数量
protected AtomicLong storedTokens = new AtomicLong(0);
//最后一次填充令牌时间
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
//stableInterval 稳定产生令牌的时间周期,1/QPS
//warmUpPeriodInSec 预热/冷启动时间 ,默认 10s
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
//斜率的计算参考Guava,当做一个固定改的公式
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//当前时间窗口通过的QPS
long passQps = (long) node.passQps();
//上一个时间窗口QPS
long previousQps = (long) node.previousPassQps();
//填充令牌
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
//当前的令牌超过警戒线,获得超过警戒线的令牌数
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
}
Логика заполнения токена следующая:
- Получите текущее время, затем удалите миллисекунды, чтобы получить время в секундах
- Время оценки меньше, чем здесь, чтобы контролировать потерю токена каждую секунду
- Тогда есть
coolDownTokens
Чтобы выяснить, как наш холодный старт/прогрев считает жетон пополнения - О количестве токенов, оставшихся в текущем расчете, я не буду говорить позже, минус последнее потребление — это оставшиеся токены в ведре.
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
//去掉当前时间的毫秒
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
//控制每秒填充一次令牌
if (currentTime <= oldLastFillTime) {
return;
}
//当前的令牌数量
long oldValue = storedTokens.get();
//获取新的令牌数量,包含添加令牌的逻辑,这就是预热的逻辑
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
//存储的令牌数量当然要减去上一次消耗的令牌
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
- Первый факт, потому что
lastFilledTime
иoldValue
все равны 0, поэтому вы получите очень большое число на основе текущей метки времени и, наконец, суммируетеmaxToken
Если вы возьмете маленький, вы получите максимальный номер заказа, поэтому он будет сгенерирован при первой инициализации.maxToken
жетон - Тогда будем считать, что QPS системы сначала низкий, а потом резко взлетает. Итак, в начале пройдите весь путь до логики над строкой предупреждения, а затем
passQps
очень низкий, поэтому он всегда будет в состоянии заполнения ведра токенов (currentTime - lastFilledTime.get()
всегда будет 1000, что составляет 1 секунду), поэтому максимальное число запросов в секунду будет заполняться каждый раз.count
количество токенов - Затем произошло внезапное увеличение трафика, и количество запросов в секунду мгновенно стало высоким, и постепенно количество токенов стало потребляться ниже линии предупреждения.
if
логикаcount
Жетон увеличения количества
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
//水位低于警戒线,就生成令牌
if (oldValue < warningToken) {
//如果桶中令牌低于警戒线,根据上一次的时间差,得到新的令牌数,因为去掉了毫秒,1秒生成的令牌就是阈值count
//第一次都是0的话,会生成count数量的令牌
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
//反之,如果是高于警戒线,要判断QPS。因为QPS越高,生成令牌就要越慢,QPS低的话生成令牌要越快
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
//不要超过最大令牌数
return Math.min(newValue, maxToken);
}
После того, как приведенная выше логика будет исправлена, мы можем продолжить рассмотрение некоторой текущей ограничивающей логики:
- Логика расчета токена завершается, и затем оценивается превышение линии предупреждения.Согласно приведенному выше утверждению, состояние низкого QPS всегда должно быть превышено, поэтому
warningQps
, поскольку мы находимся в состоянии холодного запуска, поэтому на этом этапе вычисляется количество запросов в секунду в соответствии с наклоном, чтобы поток медленно достиг пикового значения, которое может выдержать система. Например, еслиcount
равно 100, тогда, когда количество запросов в секунду очень низкое, корзина маркеров всегда заполнена, но система будет контролировать количество запросов в секунду, а фактическое количество пройденных запросов в секунду равноwarningQps
, их может быть только 10 или 20 по алгоритму (как считать на понимание не влияет). Когда первичный ключ QPS увеличивается,aboveToken
затем постепенно уменьшается, весьwarningQps
Он постепенно становился все больше, пока не попал под предупредительную черту.else
в логике. - Внезапное увеличение трафика
else
Когда логика находится ниже линии предупреждения, наше ведро токенов постоянно основано наcount
Для увеличения токенов, в это время скорость потребления токенов превышает скорость генерации токенов, что может привести к постоянному нахождению под линией предупреждения.В это время, конечно, судить нужно по самому высокому QPS чтобы судить о текущем пределе.
long restToken = storedTokens.get();
if (restToken >= warningToken) {
//当前的令牌超过警戒线,获得超过警戒线的令牌数
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
Итак, представьте себе этот процесс в соответствии с процессом от низкого QPS к внезапному высокому QPS:
- В начале QPS системы был очень низким, и мы просто заполнили ведро токенов во время инициализации.
- Затем это состояние низкого QPS длилось какое-то время, потому что мы всегда будем заполнять токены максимальным количеством QPS (поскольку берется минимальное значение, токены в ведре в основном не изменятся), поэтому ведро с токенами было в полном состоянии ограничение по току всей системы также находится на относительно низком уровне
Вышеупомянутая часть была выше линии предупреждения, которая на самом деле является процессом, называемым холодным запуском/прогревом.
- Затем QPS системы резко подскочил, и скорость потребления токенов была слишком быстрой, даже если мы каждый раз увеличивали максимальное количество токенов QPS, потребление не могло поддерживаться, поэтому токены в ведре продолжали уменьшаться. время лимит QPS стадии холодного старта также постоянно увеличивается, пока токен в ведре не окажется ниже линии предупреждения
- После того, как он опустится ниже линии предупреждения, система ограничит ток в соответствии с наивысшим значением QPS.Этот процесс представляет собой процесс постепенного достижения системой максимального предела тока.
Таким образом, наша цель справиться с внезапным потоком трафика фактически достигнута.Вся система в течение длительного времени адаптируется к внезапному высокому QPS и, наконец, достигает порога QPS системы.
- Наконец, если QPS вернется в нормальное состояние, он постепенно вернется к строке предупреждения и вернется к исходному процессу.
Суммировать
Поскольку алгоритм относительно прост, если его произнести отдельно, каждый может его понять, и ему не нужно несколько слов, чтобы объяснить его, поэтому мне все еще нужно получить исходный код, чтобы посмотреть, как играют другие, поэтому, хотя я ненавижу помещать исходный код, Но все же пришлось это сделать.
Просто полагаться на то, что другие скажут немного, на самом деле немного сложно понять, но если вы прочитаете это по порядку, вы это поймете.
Самое сложное для понимания в исходниках - это реализация ведра токенов.Честно говоря, я несколько раз читал логику тех вычислений и не знаю, что за хрень он вычисляет, но понять можно мышление, а другая логика относительно проста, ее легче понять.