Анализ исходного кода Sentinel (часть 3): анализ StatisticSlot

Java

1. Введение

В первых двух статьях в основном анализируется функция и создание Context, Entry, Node и формирование всей цепочки вызовов. В этой статье мы приступим к анализу StatisticSlot, который в основном отвечает за статистику различных показателей доступа к ресурсам, включая количество запросов, количество успешных запросов, количество потоков запросов и время ответа. И проанализируйте, как Sentinel сохраняет эти показатели.

2. StatisticSlot

2.1. Роль StatisticSlot

Анализируя цепочку вызовов Sentinel, я вкратце упомянул, что StatisticSlot — это функциональный слот в Sentinel, в основном отвечающий за сбор статистики и сохранение различных показателей ресурсов в Sentinel, а также выполнение статистики по разным измерениям. Последующие другие слоты функций будут реализовывать свои собственные функции в соответствии со статистическими показателями в StatisticSlot.

2.2 Принцип работы StatisticSlot

При входе в слот функции StatisticSlot первым будет выполняться метод entry(), код которого выглядит следующим образом:

 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
        boolean prioritized, Object... args) throws Throwable {

        try {
            // Do some checking.
            //先出发下一个slot作用,如果剩下的所有slot都没有抛出异常,代表没有被各种规则限制
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            //线程数加一,defaultNode和clusterNode同时累加。
            node.increaseThreadNum();
            //请求通过数,defaultNode和clusterNode同时累加,窗口时间
            node.addPassRequest(count);

            //如果来源节点不为空,则统计来源节点
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            //entryType为in的,使用一个全局的clusternode来统计
            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
    }

Из исходного кода видно, что основная работа StatisticSlot's entry() заключается в следующем:

  • Остальные функциональные слоты вызываются первыми.
  • Если остальные слоты функций выполняются нормально, запишите количество потоков запросов и количество переданных запросов.
  • Если при выполнении оставшихся слотов функций возникает PriorityWaitException, запишите количество потоков запросов.
  • Если во время выполнения оставшихся слотов функций возникает исключение BlockException, записывается количество запросов блоков, и для текущей записи устанавливается причина ошибки.
  • Если при выполнении оставшихся функциональных слотов возникают другие ошибки, запишите количество исключений и установите причину ошибки для текущей записи.

При выходе из StatisticSlot вызывается метод exit(), код которого выглядит следующим образом:


    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

        DefaultNode node = (DefaultNode) context.getCurNode();

        //如果没有发生异常
        if (context.getCurEntry().getError() == null) {
            // Calculate response time (max RT is TIME_DROP_VALVE).
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            // Record response time and success count.
            node.addRtAndSuccess(rt, count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // Error may happen.
        }

        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        fireExit(context, resourceWrapper, count);
    }

По коду видно, что основная работа StatisticSlot это:

  • Если исключения не возникает, запишите время ответа и количество успешных запросов для текущего запроса, а также уменьшите количество потоков запросов.

Следует отметить, что при входе или выходе из StatisticSlot будут записываться данные различных измерений, включая DefaultNode, ClusterNode, исходный узел и ClusterNode с EntryType In.

2.3. Резюме

На самом деле работа StatisticSlot очень проста: при поступлении запроса он записывает разную информацию в зависимости от того, аномален он или нет, и записывает некоторую информацию при выходе. То, как это записывать, контролируется самим Node.

3. Скользящее временное окно

Sentinel использует скользящее временное окно для расчета таких показателей, как количество запросов в секунду.

StatisticSlot вызывает node.addPassRequest(count) и другие методы для накопления показателей при подсчете показателей. Глядя на это, в свою очередь, он в конечном итоге определит два скользящих временных окна уровня времени в StatisticNode для хранения данных и статистики.

  /**
     * 持有秒级别的统计数据指标,默认一秒,样本数2
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     * 持有分钟级别的数据指标,单位一分钟,样本数量为60,即每秒一个样本
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

3.1. Metric

Метрика представляет собой интерфейс, который представляет собой базовую структуру различных показателей ресурсов.Определяет методы получения и накопления различных показателей.Показатели включают в себя количество успешных запросов, количество аномальных запросов, количество заблокированных запросов, количество запросов, и время отклика.

3.2. ArrayMetric

ArrayMetric реализует интерфейс Metric и реализует все его методы.

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    ...
}

Выше приведено определение ArrayMetric. LeapArray используется в ArrayMetric для сохранения информации статистического индикатора. LeapArray — абстрактный класс. Реализация по умолчанию — OccupiableBucketLeapArray, но в соответствии с конструктором он может быть указан как BucketLeapArray. Что касается LeapArray, я расскажу о нем позже.

Мы используем количество успешных запросов, чтобы увидеть, как реализован исходный код:

 @Override
    public long success() {
        data.currentWindow();
        long success = 0;

        List<MetricBucket> list = data.values();
        for (MetricBucket window : list) {
            success += window.success();
        }
        return success;
    }

Как видите, Sentinel сначала получает текущее временное окно, а затем получает все

3.3. LeapArray

LeapArray — это структура данных, которая сохраняет временные окна и определяется следующим образом:

    /**
     * 单个滑动时间窗口的大小
     */
    protected int windowLengthInMs;

    /**
     *总共的窗口数
     */
    protected int sampleCount;

    /**
     * 总的时间大小
     */
    protected int intervalInMs;

    /**
     * 采样的时间窗口数组
     */
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();

Массив AtomicReferenceArray используется в LeapArray для хранения данных в каждом временном окне Размер каждого временного окна равен windowLengthInMs, а общий интервал времени равен intervalInMs, поэтому общее количество окон равно intervalInMs/windowLengthInMs.

Массив типа WindowWrap используется в LeapArray, а WindowWrap представляет данные каждого временного окна, включая размер временного окна, время начала и статистические данные.

public class WindowWrap<T> {

    /**以毫秒为单位的时间窗口长度
     * Time length of a single window bucket in milliseconds.
     */
    private final long windowLengthInMs;

    /**
     * 开始时间
     * Start timestamp of the window in milliseconds.
     */
    private long windowStart;

    /**
     * 统计的数据
     * Statistic data.
     */
    private T value;

}

В LeapArray есть очень важный метод, currentWindow().Этот метод будет получать конкретный объект временного окна в соответствии с входящим временем, то есть WindowWrap.Если соответствующего временного окна нет, он создаст новый. Последующие записи количества успешных запросов, количества исключений и т. д. обрабатываются через этот объект.

 public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis);
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

Выше приведен метод получения объекта временного окна указанного времени.Основная логика следующая:

  • Вычислите соответствующий индекс массива в соответствии с входящей меткой времени, чтобы получить соответствующий WindowWrap.
  • Рассчитать соответствующее время начала окна на основе переданной метки времени.
  • Если соответствующий WindowWrap не существует, создайте новый WindowWrap.
  • Если соответствующий WindowWrap существует и время начала WindowWrap совпадает с расчетным временем начала, он вернется напрямую.
  • Если соответствующий WindowWrap существует и время начала WindowWrap меньше расчетного времени начала, сбросьте временное окно.

Следует отметить, что методы newEmptyBucket() и resetWindowTo(), вызываемые в методе currentWindow(), являются абстрактными методами, а конкретные методы реализованы в их подклассах. Существует много подклассов LeapArray, но ArrayMetric вызывает подозрение, что OccupiableBucketLeapArray используется в качестве класса реализации, поэтому конкретная реализация этих двух методов находится в OccupiableBucketLeapArray.

Как правило, LeapArray сохраняет данные во временном интервале, а данные в каждом временном интервале сохраняются с помощью WindowWrap. В LeapArray есть метод currentWindow(), который используется для получения или создания структуры временного окна в определенное время. Статистическая обработка последующих данных осуществляется в соответствии с этим временным окном.

3.3. MetricBucket

LeapArray — это объект универсального типа.При создании нового LeapArrat в ArrayMetric передается конкретный тип — MetricBucket, что означает, что данные в WindowWrap имеют тип MetricBucket. MetricBucket — это структура данных, которая действительно записывает различные метрики.

public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    
    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }
 ...
}

MertricBucket использует массив LongAdder для хранения различных метрик.Глядя на конструктор MertricBucket, видно, что массив в это время инициализируется.Каждый элемент массива представляет собой показатель, а порядок фиксирован.Порядок, в котором перечисление определено указано. Будь то получение или изменение индикатора, это делается путем манипулирования соответствующими элементами в этом массиве.

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

4. Резюме

В этой статье в основном анализируется конкретный рабочий процесс StatisticSlot, а Sentinel использует скользящее временное окно для сохранения данных. Основной поток статистических данных примерно следующий:

В первых трех статьях мы проанализировали основной рабочий процесс Sentinel, вызов ссылок, а также разобрали принцип размерности и принцип хранения данных статистических показателей Sentinel. Далее мы проанализируем, как реализованы другие функциональные слоты Sentinel, такие как ограничение тока и даунгрейд.