1. Введение
В первых двух статьях в основном анализируется функция и создание Context, Entry, Node и формирование всей цепочки вызовов. В этой статье мы приступим к анализу StatisticSlot, который в основном отвечает за статистику различных показателей доступа к ресурсам, включая количество запросов, количество успешных запросов, количество потоков запросов и время ответа. И проанализируйте, как Sentinel сохраняет эти показатели.
2. StatisticSlot
2.1. Роль StatisticSlot
Анализируя цепочку вызовов Sentinel, я вкратце упомянул, что StatisticSlot — это функциональный слот в Sentinel, в основном отвечающий за сбор статистики и сохранение различных показателей ресурсов в Sentinel, а также выполнение статистики по разным измерениям. Последующие другие слоты функций будут реализовывать свои собственные функции в соответствии со статистическими показателями в StatisticSlot.
2.2 Принцип работы StatisticSlot
При входе в слот функции StatisticSlot первым будет выполняться метод entry(), код которого выглядит следующим образом:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
//先出发下一个slot作用,如果剩下的所有slot都没有抛出异常,代表没有被各种规则限制
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
//线程数加一,defaultNode和clusterNode同时累加。
node.increaseThreadNum();
//请求通过数,defaultNode和clusterNode同时累加,窗口时间
node.addPassRequest(count);
//如果来源节点不为空,则统计来源节点
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//entryType为in的,使用一个全局的clusternode来统计
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
Из исходного кода видно, что основная работа StatisticSlot's entry() заключается в следующем:
- Остальные функциональные слоты вызываются первыми.
- Если остальные слоты функций выполняются нормально, запишите количество потоков запросов и количество переданных запросов.
- Если при выполнении оставшихся слотов функций возникает PriorityWaitException, запишите количество потоков запросов.
- Если во время выполнения оставшихся слотов функций возникает исключение BlockException, записывается количество запросов блоков, и для текущей записи устанавливается причина ошибки.
- Если при выполнении оставшихся функциональных слотов возникают другие ошибки, запишите количество исключений и установите причину ошибки для текущей записи.
При выходе из StatisticSlot вызывается метод exit(), код которого выглядит следующим образом:
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode) context.getCurNode();
//如果没有发生异常
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
По коду видно, что основная работа StatisticSlot это:
- Если исключения не возникает, запишите время ответа и количество успешных запросов для текущего запроса, а также уменьшите количество потоков запросов.
Следует отметить, что при входе или выходе из StatisticSlot будут записываться данные различных измерений, включая DefaultNode, ClusterNode, исходный узел и ClusterNode с EntryType In.
2.3. Резюме
На самом деле работа StatisticSlot очень проста: при поступлении запроса он записывает разную информацию в зависимости от того, аномален он или нет, и записывает некоторую информацию при выходе. То, как это записывать, контролируется самим Node.
3. Скользящее временное окно
Sentinel использует скользящее временное окно для расчета таких показателей, как количество запросов в секунду.
StatisticSlot вызывает node.addPassRequest(count) и другие методы для накопления показателей при подсчете показателей. Глядя на это, в свою очередь, он в конечном итоге определит два скользящих временных окна уровня времени в StatisticNode для хранения данных и статистики.
/**
* 持有秒级别的统计数据指标,默认一秒,样本数2
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 持有分钟级别的数据指标,单位一分钟,样本数量为60,即每秒一个样本
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
3.1. Metric
Метрика представляет собой интерфейс, который представляет собой базовую структуру различных показателей ресурсов.Определяет методы получения и накопления различных показателей.Показатели включают в себя количество успешных запросов, количество аномальных запросов, количество заблокированных запросов, количество запросов, и время отклика.
3.2. ArrayMetric
ArrayMetric реализует интерфейс Metric и реализует все его методы.
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
...
}
Выше приведено определение ArrayMetric. LeapArray используется в ArrayMetric для сохранения информации статистического индикатора. LeapArray — абстрактный класс. Реализация по умолчанию — OccupiableBucketLeapArray, но в соответствии с конструктором он может быть указан как BucketLeapArray. Что касается LeapArray, я расскажу о нем позже.
Мы используем количество успешных запросов, чтобы увидеть, как реализован исходный код:
@Override
public long success() {
data.currentWindow();
long success = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
Как видите, Sentinel сначала получает текущее временное окно, а затем получает все
3.3. LeapArray
LeapArray — это структура данных, которая сохраняет временные окна и определяется следующим образом:
/**
* 单个滑动时间窗口的大小
*/
protected int windowLengthInMs;
/**
*总共的窗口数
*/
protected int sampleCount;
/**
* 总的时间大小
*/
protected int intervalInMs;
/**
* 采样的时间窗口数组
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
Массив AtomicReferenceArray используется в LeapArray для хранения данных в каждом временном окне Размер каждого временного окна равен windowLengthInMs, а общий интервал времени равен intervalInMs, поэтому общее количество окон равно intervalInMs/windowLengthInMs.
Массив типа WindowWrap используется в LeapArray, а WindowWrap представляет данные каждого временного окна, включая размер временного окна, время начала и статистические данные.
public class WindowWrap<T> {
/**以毫秒为单位的时间窗口长度
* Time length of a single window bucket in milliseconds.
*/
private final long windowLengthInMs;
/**
* 开始时间
* Start timestamp of the window in milliseconds.
*/
private long windowStart;
/**
* 统计的数据
* Statistic data.
*/
private T value;
}
В LeapArray есть очень важный метод, currentWindow().Этот метод будет получать конкретный объект временного окна в соответствии с входящим временем, то есть WindowWrap.Если соответствующего временного окна нет, он создаст новый. Последующие записи количества успешных запросов, количества исключений и т. д. обрабатываются через этот объект.
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
Выше приведен метод получения объекта временного окна указанного времени.Основная логика следующая:
- Вычислите соответствующий индекс массива в соответствии с входящей меткой времени, чтобы получить соответствующий WindowWrap.
- Рассчитать соответствующее время начала окна на основе переданной метки времени.
- Если соответствующий WindowWrap не существует, создайте новый WindowWrap.
- Если соответствующий WindowWrap существует и время начала WindowWrap совпадает с расчетным временем начала, он вернется напрямую.
- Если соответствующий WindowWrap существует и время начала WindowWrap меньше расчетного времени начала, сбросьте временное окно.
Следует отметить, что методы newEmptyBucket() и resetWindowTo(), вызываемые в методе currentWindow(), являются абстрактными методами, а конкретные методы реализованы в их подклассах. Существует много подклассов LeapArray, но ArrayMetric вызывает подозрение, что OccupiableBucketLeapArray используется в качестве класса реализации, поэтому конкретная реализация этих двух методов находится в OccupiableBucketLeapArray.
Как правило, LeapArray сохраняет данные во временном интервале, а данные в каждом временном интервале сохраняются с помощью WindowWrap. В LeapArray есть метод currentWindow(), который используется для получения или создания структуры временного окна в определенное время. Статистическая обработка последующих данных осуществляется в соответствии с этим временным окном.
3.3. MetricBucket
LeapArray — это объект универсального типа.При создании нового LeapArrat в ArrayMetric передается конкретный тип — MetricBucket, что означает, что данные в WindowWrap имеют тип MetricBucket. MetricBucket — это структура данных, которая действительно записывает различные метрики.
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
...
}
MertricBucket использует массив LongAdder для хранения различных метрик.Глядя на конструктор MertricBucket, видно, что массив в это время инициализируется.Каждый элемент массива представляет собой показатель, а порядок фиксирован.Порядок, в котором перечисление определено указано. Будь то получение или изменение индикатора, это делается путем манипулирования соответствующими элементами в этом массиве.
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
4. Резюме
В этой статье в основном анализируется конкретный рабочий процесс StatisticSlot, а Sentinel использует скользящее временное окно для сохранения данных. Основной поток статистических данных примерно следующий:
В первых трех статьях мы проанализировали основной рабочий процесс Sentinel, вызов ссылок, а также разобрали принцип размерности и принцип хранения данных статистических показателей Sentinel. Далее мы проанализируем, как реализованы другие функциональные слоты Sentinel, такие как ограничение тока и даунгрейд.