Сбор данных Sentinel в режиме реального времени

Java
Сбор данных Sentinel в режиме реального времени

Публичный аккаунт WeChat: маленький черный домовладелец
Поскольку дорога идет далеко, будущее будет лучше
Обучение безгранично, давайте работать вместе!

В Sentinel есть важная функция, то есть статистический анализ данных в режиме реального времени, мы можем получать количество запросов, количество блокировок или время отклика ресурса в каждой ссылке контекстного вызова каждую 1 секунду или каждую 1 минуту, мы также можем получить глобальное количество запросов, блоков или времени отклика для ресурса. Основная логика реализации находится вStatisticSlotсередина.

StatisticslotТретий слот в цепочке вызовов отвечает за состояние ресурсов статистики в реальном времени.При вызове любого слота в цепочке слотов срабатывает метод входа в слот.

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 触发下一个Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
        // 统计信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代码
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代码
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代码
        throw e;
    }
}

entry()Есть три основные части:
1) Сначала сработает метод входа следующего слота, такой как правила SystemSlot, FlowSlot, DegradeSlot и т.д. 2) При прохождении следующего слота исключение BlockException не генерируется, что указывает на то, что ресурс был успешно вызван, а количество потоков выполнения и количество пропущенных запросов увеличивается.
3) При выходе из строя одного из следующих слотов будет выброшено исключение, такое как BlockException.Если исключение BlockException поймано, количество блоков будет увеличено, если это системное исключение, количество исключений будет увеличено.

будет выполняться при выходеexit()метод:

 public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();
        if (context.getCurEntry().getError() == null) {
            //计算响应时间,通过当前时间-CurEntry的创建时间取毫秒值
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }
            //新增响应时间和成功数
            node.addRtAndSuccess(rt, count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
            }
            //线程数减1
            node.decreaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }
            //全局线程数-1
            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // Error may happen.
        }
        ***其他逻辑***
        fireExit(context, resourceWrapper, count);
    }

При выходе сосредоточьтесь на времени отклика, соберите это время отклика в Node и уменьшите текущее количество активных потоков на 1.

Общий процесс такой же, как описано выше, но мы не ясны в отношении конкретной операции.Далее я проанализирую, как подсчитывается количество запросов в секунду.

вышеentry()Метод будет вызываться при подсчете количества запросов в секунду.node.addPassRequest();метод.

@Override
public void addPassRequest(int count) {
     # DefaultNode类型  
     # 统计某个resource在某个context中的实时指标
     super.addPassRequest(count);
     # ClusterNode类型
     # 统计某个resource在所有的context中实时指标总和
     this.clusterNode.addPassRequest(count);
}

Оба узлаStatisticNodeПодкласс , который в конечном итоге вызоветStatisticNodeметод в .

@Override
public void addPassRequest(int count) {
     # 秒级统计
     rollingCounterInSecond.addPass(count);
     # 分钟统计
     rollingCounterInMinute.addPass(count);
}

Основополагающие принципы статистики второго уровня и минутной статистики одинаковы, статистика второго уровня будет проанализирована ниже.


Исправление, на самом деле это OccupiableBucketLeapArray вместо BucketLeapArray в статистике второго уровня
Для лучшего понимания предположим, что BucketLeapArray используется в секундах, а не в действительности.
Для минутного уровня имеется 60 окон, каждое окно равно 1 с.


public class StatisticNode implements Node {
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public class ArrayMetric implements Metric {
    private final LeapArray<MetricBucket> data;
    
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    @Override
    public void addPass(int count) {
           WindowWrap<MetricBucket> wrap = data.currentWindow();
           wrap.value().addPass(count);
    }

В приведенном выше коде есть несколько важных классов.ArrayMetric,BucketLeapArray,MetricBucket,WindowWrap.

WindowWrap

Класс-оболочка каждого скользящего окна и его внутренняя структура данных T представлены MetricBucket.

public class WindowWrap<T> {
    //一个窗口时段的时间长度(以毫秒为单位)
    private final long windowLengthInMs;
    //窗口的开始时间戳(以毫秒为单位)
    private long windowStart;
    //统计数据,MetricBucket
    private T value;

MetricBucket

Представляет данные индикатора за период времени, хранящиеся вLongAdderв массиве типов. Есть количество пропущенных, заблокированных, исключенных, успешных, время ответа, пройденная будущая квота. относительноAtomicLong,LongAddrПовышение пропускной способности при высокой степени параллелизма за счет большего объема памяти.

public class MetricBucket {
    private final LongAdder[] counters;
    private volatile long minRt;
 public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
}

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

LeapArray

Базовая структура данных статистических показателей в Sentinel.

public LeapArray(int sampleCount, int intervalInMs) {
    # 时间窗口的长度
    this.windowLengthInMs = intervalInMs / sampleCount;
    # 以毫秒为单位的时间间隔,
    this.intervalInMs = intervalInMs;
    # 采样窗口的个数,即数组长度
    this.sampleCount = sampleCount;
    this.array = new AtomicReferenceArray<>(sampleCount);
}

При подсчете в секундах длина массива временных окон по умолчанию равна 2, а длина каждого временного окна составляет 500 мс.

При подсчете QPS первым шагом является вызовdata.currentWindow(), чтобы получить текущее временное окно.

public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
}

Первый большой шаг в добавлении Qps

следующая параcurrentTimeMills()Метод анализа разборки.

public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        # 计算给定的时间映射在数组中的下标(默认数组长度为2)
        # 则idx可以是0或者1
        int idx = calculateTimeIdx(timeMillis);
        # 根据当前时间计算出所在窗口应该对用的开始时间
        long windowStart = calculateWindowStart(timeMillis);
private int calculateTimeIdx(long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
}

Зачем использовать два окна выборки по умолчанию, потому что Sentinel устанавливает относительно легкую структуру. Временное окно сохраняет много статистических данных.Если временных окон слишком много, с одной стороны, будет занято слишком много памяти.С другой стороны, слишком много временных окон будет означать, что длина временного окна станет Вызывает слишком частое скольжение временного окна.

while (true) {
      # 获取存储的该索引位置下的旧的时间窗口
      WindowWrap<T> old = array.get(idx);
      if (old == null) {
          # 没有则创建一个
          WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
          # 通过CAS进行设置
          if (array.compareAndSet(idx, null, window)) {
                 return window;
           } else {
                //否则当前线程让出时间片,再进行线程竞争
                Thread.yield();
           }
     # 如果实际应当的开始时间和原来的窗口的开始时间相等,则说明没有失效,直接返回
     } else if (windowStart == old.windowStart()) {
            return old;
     # 让应当的开始时间大于原来old窗口的开始时间,则说明该窗口失效
     } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
               try {
                   # 将旧的时间窗口的开始时间设置为实际应该的开始时间,
                   # 并重置该窗口的统计数据为0
                    return resetWindowTo(old, windowStart);
               } finally {
                   updateLock.unlock();
               }
            }  else {
                 Thread.yield();
                }
    # 这种情况不可能存在,会抛出异常
    } else if (windowStart < old.windowStart()) {
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
           }
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        # w.value() 即 MetricBucket 
        w.value().reset();
        return w;
}
#重新设置它的开始时间
public WindowWrap<T> resetTo(long startTime) {
        this.windowStart = startTime;
        return this;

# 将MetricBucket的统计数据都重置为0
public void reset() {
        internalReset(0L);
}

Qps добавляет второй большой шаг

До сих пор был введен первый большой шаг, следующий - второй большой шагwrap.value().addPass(count). Этот шаг очень прост, то есть вы получите временное окно после первого шага.WindowWrap, а затем получить внутри классаMetricBucket, который подсчитывает статистику данных в окне событий и, наконец, выполняет операцию атомарного увеличения.

private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
}
public T value() {
        return value;
}

public void addPass(int n) {
        add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
}

Выше приведен общий процесс увеличения Qps.

Сбор данных Qps

Затем мы добавили данные, как их запросить?

Изучив и поняв, мы можем узнать, что данные статистики ресурсов хранятся вDefaultNodeа такжеClsterNode, они всеStatisticNodeподкласс ,StatisticNodeДостигнутоNOdeВ интерфейсе есть множество методов статистических данных, в том числе метод статистических Qps.

@Override
public double passQps() {
        # 先获取现在的时间窗口数组的Qps总量 @(1)
        # 然后获取时间 @(2)
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

Разбор кода @(1)

@Override
public long pass() {
        # 与前面方法一致,过滤掉过期窗口
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
}

public List<T> values() {
        return values(TimeUtil.currentTimeMillis());
}

public List<T> values(long timeMillis) {
        if (timeMillis < 0) {
            return new ArrayList<T>();
        }
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                continue;
            }
            # 即 MetricBucket
            result.add(windowWrap.value());
        }
        return result;
    }

Текущее время за вычетом времени начала окна превышает интервал события (1 с, если считать в секундах), что означает, что окно истекает и не добавляется.

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
        return time - windowWrap.windowStart() > intervalInMs;
}

Разбор кода @(2)

Поскольку раньше единицей измерения времени были миллисекунды, а теперь вычисления производятся в секунду, они преобразуются в секунды.

@Override
public double getWindowIntervalInSec() {
        return data.getIntervalInSecond();
}

public double getIntervalInSecond() {
        return intervalInMs / 1000.0;
}

На этом модуль про статистику в реальном времени закончен.Большинство из них относятся к статьям нескольких великих богов, которые хорошо понятны с картинками и текстами.Можно прочитать следующее:

Принцип стража — скользящее окно
Принцип реализации скользящего окна Alibaba Seninel (схема приложена в конце статьи)
Анализ исходного кода Принцип реализации сбора данных Sentinel в режиме реального времени