Ограничение тока кластера Sentinel

Java
Ограничение тока кластера Sentinel

Публичный аккаунт WeChat: маленький черный домовладелец
Поскольку дорога идет далеко, будущее будет лучше
Обучение безгранично, давайте работать вместе!

Предыдущие текущие функции ограничения - все версии для одной машины, которые могут подсчитывать только количество вызовов локальной службы.Если служба размещается на нескольких серверах в состоянии кластера, при условии, что в кластере 5 машин, каждая одна машина составляет 10 запросов в секунду. В идеальном состоянии текущий предельный порог всего кластера составляет 50 запросов в секунду. Однако в реальных условиях трафик, направляемый на каждую машину, может быть неравномерным, что приведет к запуску некоторых машин, когда общая сумма не достигнуто.

Каждый одномашинный экземпляр заботится только о своем пороге, но всем безразличен глобальный порог всей системы.Когда мы хотим установить общий Qps для API, текущий лимит в одномашинном режиме не может удовлетворить условиям.
Автономная версия выполняет статистику в каждом экземпляре, а кластерная версия имеет выделенный экземпляр для статистики.

Этот сервер токенов называется Sentinel, который специально используется для статистических данных. Другие экземпляры, как клиент токенов Sentinel, будут запрашивать токены у сервера токенов. Если токен можно получить, это означает, что текущий qps еще не достиг общего порог Указывает, что общий порог кластера достигнут, и текущий экземпляр будет заблокирован.

Сценарии, подходящие для ограничения тока кластера:
1) Подсчитайте общий трафик API на шлюзе API и ограничьте общее количество запросов в секунду для API или службы.
2) Глобальное управление потоком вызовов между сервисами в Service Mesh.
3) Ограничить общую частоту доступа к горячим товарам внутри кластера.

отправная точка

Дросселирование кластера Sentinel находится в стадииFlowSlotреализовано в. Он найдет все правила регулирования на основе имени ресурса.FlowRule, затем вызовите каждое правило по очередиcanPassCheckОценивается, может ли текущее ограничивающее правило быть пройдено.

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

Если правило применяется в кластерном режиме, оно вызоветpassClusterCheckметод.

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService(); \\@1
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);  //@2
            }
            long flowId = rule.getClusterConfig().getFlowId(); //@3
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);  //@4
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

@1 Код должен определить, является ли текущий узел Token Client или Token Server. 1) Если роль текущего узла — Клиент, возвращаемый TokenService —DefaultClusterTokenClient; 2) Если роль текущего узла — Сервер, по умолчанию возвращается TokenService.DefaultTokenService.

Код @2: если TokenService кластера не может быть получен, правило управления потоком может быть переведено в режим ограничения тока для одной машины.

private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                 boolean prioritized) {
        if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
            return passLocalCheck(rule, context, node, acquireCount, prioritized);
        } else {
            return true;
        }
    }

@3 код для получения идентификатора потока управления потоком, в кластерном режиме каждое правило имеет соответствующийClusterFlowConfig.

Введение в класс ClusterFlowConfig:

public class ClusterFlowConfig {
    // 全局唯一id
    private Long flowId;
    // 有两种阈值类型,一种是单机均摊,一种是集群总体模式
    private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
    //集群不可用时是否回退到单机模式
    private boolean fallbackToLocalWhenFail = true;
    
    private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
    // 集群采样数 10 
    private int sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
   // 1000ms,即1秒
    private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;

@4 Код применяется для токена через TokenService в соответствии с полученным идентификатором потока. Как видно из вышеизложенного, его может вызывать TokenClient, а может вызывать ToeknServer. Соответствующие классыDefaultClusterTokenClientа такжеDefaultTokenService.

Ниже приводится интерпретация двух ролей TokenClient и TokenService.

DefaultClusterTokenClient

public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
        // 如果flowId是无效的,或则count小于等于0
        // id == null || id <= 0 || count <= 0;
        if (notValidRequest(flowId, acquireCount)) {
            return badRequest();
        }
        // 新建一个请求对象
        FlowRequestData data = new FlowRequestData().setCount(acquireCount)
            .setFlowId(flowId).setPriority(prioritized);
        // 进一步封装为ClusterRequest,消息类型是Flow,
        //  MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
        ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
        try {
            // 然后向TokenServer发送请求
            TokenResult result = sendTokenRequest(request);
            logForResult(result);
            return result;
        } catch (Exception ex) {
            ClusterClientStatLogUtil.log(ex.getMessage());
            return new TokenResult(TokenResultStatus.FAIL);
        }
    }
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }

При запуске клиента будет создано соединение с TokenServer.Когда клиентский объект пуст на момент отправки запроса, будет зафиксирован сбой запроса.

DefaultTokenService

После того, как Token Server получит запрос клиента, он вызоветFlowRequestProcessorизprocessRequest, который в конечном итоге вызоветDefaultTokenServiceизrequestTokenметод.

@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();
        TokenResult result = tokenService.requestToken(flowId, count, prioritized);
        return toResponse(result, request);
    }
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        // 和上面一样,先进行验证
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        // 从一个Map中进行查找 
        // private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        // 没有该规则
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        // 服务端进行检查,是否发送令牌token
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }

потому чтоacquireClusterTokenОн относительно длинный, поэтому разбит на пояснения.

первый шаг:

Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
   return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
} // @1

@1 Сначала код определяет, разрешено ли это приложение лицензии. Это связано с тем, что TokenServe поддерживает встраивание, то есть поддерживает встраивание TokenServer в узел приложения. Это действие ограничено по текущему моменту.

После срабатывания текущего лимита клиенту будет возвращен код состояния Too_Many_Request. Sentinel поддерживает текущий лимит по пространству имен, который реализуется GlobalRequestLimiter. Внутренняя коллекция этого класса также основана на скользящем окне. Принцип аналогичен FlowSlot.Ограничение тока TPS по умолчанию составляет 3 Вт.

static boolean allowProceed(long flowId) {
        String namespace = ClusterFlowRuleManager.getNamespace(flowId);
        return GlobalRequestLimiter.tryPass(namespace);
    }
public static boolean tryPass(String namespace) {
        if (namespace == null) {
            return false;
        }
        // private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
        RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
        if (limiter == null) {
            return true;
        }
        return limiter.tryPass();
    }

canPassМетод заключается в том, чтобы вычислить, превышает ли текущий номер прохода + 1 qpsAllowed.

public boolean tryPass() {
        if (canPass()) {
            add(1);
            return true;
        }
        return false;
}

public void add(int x) {
        data.currentWindow().value().add(x);
}

public boolean canPass() {
        return getQps() + 1 <= qpsAllowed;
}

public double getQps() {
        return getSum() / data.getIntervalInSecond();
}
private final LeapArray<LongAdder> data;

public long getSum() {
        data.currentWindow();
        long success = 0;

        List<LongAdder> list = data.values();
        for (LongAdder window : list) {
            success += window.sum();
        }
        return success;
}

Из вышеизложенного видно, что временное окно также используется для статистики в кластерном режиме.

Второй большой шаг:

Получить сборщик парных индикаторов на основе FlowIdmetric.

private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();  

ClusterMetric metric = ClusterMetricStatistics.getMetric(id);  
if (metric == null) {
    return new TokenResult(TokenResultStatus.FAIL);
}

Метрика – это конкретноClusterMetricLeapArray, с предыдущимOccupiableBucketLeapArrayТочно так же есть дополнительный массив, в который записываются данные о вытеснении.

public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {

    private final LongAdder[] occupyCounter;
    private boolean hasOccupied = false;

    public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        ClusterFlowEvent[] events = ClusterFlowEvent.values();
        this.occupyCounter = new LongAdder[events.length];
        for (ClusterFlowEvent event : events) {
            occupyCounter[event.ordinal()] = new LongAdder();
        }
    }

Третий шаг:

Получите текущие пройденные Qps, общее количество установленных лицензий и количество оставшихся лицензий.

double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;

еслиFLOW_THRESHOLD_GLOBAL, то есть количество лицензий в кластере равно значению счетчика, настроенному в текущем правиле ограничения.
еслиFLOW_THRESHOLD_AVG_LOCAL, значение, настроенное в текущем правиле ограничения, представляет собой только значение счетчика для одной машины, а также умножается на количество клиентов в кластере. надgetExceedCountПо умолчанию 1.0.

private static double calcGlobalThreshold(FlowRule rule) {
        double count = rule.getCount();
        switch (rule.getClusterConfig().getThresholdType()) {
            case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
                return count;
            case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
            default:
                int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
                return count * connectedCount;
        }
}

Четвертый шаг:

Если количество оставшихся лицензий больше или равно 0, обновите текущую статистику.

if (nextRemaining >= 0) {
     //增加通过数和通过的请求数
     metric.add(ClusterFlowEvent.PASS, acquireCount);
     metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
     if (prioritized) {
           // Add prioritized pass.
           // Pass (pre-occupy incoming buckets)
            metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
      }
      // Remaining count is cut down to a smaller integer.
      return new TokenResult(TokenResultStatus.OK)
                .setRemaining((int) nextRemaining)
                .setWaitInMs(0);
} 

Пятый шаг:

Если оставшееся число меньше 0.

if (prioritized) {
       // Try to occupy incoming buckets.
       // Waiting due to flow shaping or for next bucket tick.
       // 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
       double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
       // 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
       if (occupyAvg <= 
        // 默认是1.0 , 后面是全局的通过阈值数
        ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
        // 计算等待的时间
        int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                    // waitInMs > 0 indicates pre-occupy incoming buckets successfully.
                    if (waitInMs > 0) {
                        ClusterServerStatLogUtil.log("flow|waiting|" + id);
                        return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                            .setRemaining(0)
                            .setWaitInMs(waitInMs);
           }
     // Or else occupy failed, should be blocked.
     }
}
// Blocked.
// 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。 
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
      // Add prioritized block.
      metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
      ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}

     return blockedResult();
}

Далее мы объясним, как рассчитать время ожидания.

// event 为 pass
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
        // 当前的通过数
        double latestQps = getAvg(ClusterFlowEvent.PASS);
        // 判断是否支持抢占
        if (!canOccupy(event, acquireCount, latestQps, threshold)) {
            return 0;
        }
        
        // 在抢占数组中添加本次的占用数
        //
        public void addOccupyPass(int count) {
        occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
        occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
        this.hasOccupied = true;
         }
        //
        metric.addOccupyPass(acquireCount);
        
        // 在普通的时间窗口增加等待数
        //
        public void add(ClusterFlowEvent event, long count) {
        metric.currentWindow().value().add(event, count);
    }
       //
        add(ClusterFlowEvent.WAITING, acquireCount);
        // 这里有些不懂,sampleCount默认值应该是10,
        // 这样的话返回的是一个时间窗口的大小
        return 1000 / metric.getSampleCount();
}

Как видно из вышеизложенного,canOccupyявляется ключевым методом.

private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
        long headPass = metric.getFirstCountOfWindow(event);
        //
        public long getOccupiedCount(ClusterFlowEvent event) {
        return occupyCounter[event.ordinal()].sum();
        }
        //
        // 获得Pass事件下的已占用的数  
        long occupiedCount = metric.getOccupiedCount(event);
        // 已通过的请求数 + 本次需要的请求数 + 占用的请求数  - 第一个统计的时间窗口的数 
        // 如果小于等于阈值,即可以抢占
        return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
    }
public long getFirstCountOfWindow(ClusterFlowEvent event) {
        if (event == null) {
            return 0;
        }
        WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
        if (windowWrap == null) {
            return 0;
        }
        return windowWrap.value().get(event);
}
public WindowWrap<T> getValidHead() {
        return getValidHead(TimeUtil.currentTimeMillis());
}
WindowWrap<T> getValidHead(long timeMillis) {
        // Calculate index for expected head time.
        int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
        WindowWrap<T> wrap = array.get(idx);
        if (wrap == null || isWindowDeprecated(wrap)) {
            return null;
        }

        return wrap;
}

В этот момент сервер определил, отправлять ли токен.

Затем переместите код обратно туда, куда был отправлен запрос клиента.

private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }

Как видно из вышеизложенного, оставшееся число в возвращаемом результате и будет временем ожидания.

Как работать с результатом Результат.

private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                         DefaultNode node,
                                                         int acquireCount, boolean prioritized) {
        switch (result.getStatus()) {
            case TokenResultStatus.OK:   
                return true;
            case TokenResultStatus.SHOULD_WAIT:
                // Wait for next tick.
                try {
                    Thread.sleep(result.getWaitInMs());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;
            case TokenResultStatus.NO_RULE_EXISTS:
            case TokenResultStatus.BAD_REQUEST:
            case TokenResultStatus.FAIL:
            case TokenResultStatus.TOO_MANY_REQUEST:
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            case TokenResultStatus.BLOCKED:
            default:
                return false;
        }
    }
}

Как видно из вышесказанного, если статус в порядке, возвращаем true и разрешаем пройти.

Если вытеснение поддерживается в случае приоритета, подождите в соответствии с возвращенным временем ожидания.

Если он находится в других состояниях: вернитесь в автономный режим для оценки.

По умолчанию возвращает ложь.

На данный момент я закончил объяснять лимит тока кластера, но логика расчета времени ожидания так и не ясна, добро пожаловать на биржу.