Публичный аккаунт WeChat: маленький черный домовладелец
Поскольку дорога идет далеко, будущее будет лучше
Обучение безгранично, давайте работать вместе!
Предыдущие текущие функции ограничения - все версии для одной машины, которые могут подсчитывать только количество вызовов локальной службы.Если служба размещается на нескольких серверах в состоянии кластера, при условии, что в кластере 5 машин, каждая одна машина составляет 10 запросов в секунду. В идеальном состоянии текущий предельный порог всего кластера составляет 50 запросов в секунду. Однако в реальных условиях трафик, направляемый на каждую машину, может быть неравномерным, что приведет к запуску некоторых машин, когда общая сумма не достигнуто.
Каждый одномашинный экземпляр заботится только о своем пороге, но всем безразличен глобальный порог всей системы.Когда мы хотим установить общий Qps для API, текущий лимит в одномашинном режиме не может удовлетворить условиям.
Автономная версия выполняет статистику в каждом экземпляре, а кластерная версия имеет выделенный экземпляр для статистики.
Этот сервер токенов называется Sentinel, который специально используется для статистических данных. Другие экземпляры, как клиент токенов Sentinel, будут запрашивать токены у сервера токенов. Если токен можно получить, это означает, что текущий qps еще не достиг общего порог Указывает, что общий порог кластера достигнут, и текущий экземпляр будет заблокирован.
Сценарии, подходящие для ограничения тока кластера:
1) Подсчитайте общий трафик API на шлюзе API и ограничьте общее количество запросов в секунду для API или службы.
2) Глобальное управление потоком вызовов между сервисами в Service Mesh.
3) Ограничить общую частоту доступа к горячим товарам внутри кластера.
отправная точка
Дросселирование кластера Sentinel находится в стадииFlowSlot
реализовано в. Он найдет все правила регулирования на основе имени ресурса.FlowRule
, затем вызовите каждое правило по очередиcanPassCheck
Оценивается, может ли текущее ограничивающее правило быть пройдено.
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
Если правило применяется в кластерном режиме, оно вызоветpassClusterCheck
метод.
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService(); \\@1
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); //@2
}
long flowId = rule.getClusterConfig().getFlowId(); //@3
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); //@4
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
@1 Код должен определить, является ли текущий узел Token Client или Token Server.
1) Если роль текущего узла — Клиент, возвращаемый TokenService —DefaultClusterTokenClient
;
2) Если роль текущего узла — Сервер, по умолчанию возвращается TokenService.DefaultTokenService
.
Код @2: если TokenService кластера не может быть получен, правило управления потоком может быть переведено в режим ограничения тока для одной машины.
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
return passLocalCheck(rule, context, node, acquireCount, prioritized);
} else {
return true;
}
}
@3 код для получения идентификатора потока управления потоком, в кластерном режиме каждое правило имеет соответствующийClusterFlowConfig
.
Введение в класс ClusterFlowConfig:
public class ClusterFlowConfig {
// 全局唯一id
private Long flowId;
// 有两种阈值类型,一种是单机均摊,一种是集群总体模式
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
//集群不可用时是否回退到单机模式
private boolean fallbackToLocalWhenFail = true;
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
// 集群采样数 10
private int sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
// 1000ms,即1秒
private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;
@4 Код применяется для токена через TokenService в соответствии с полученным идентификатором потока. Как видно из вышеизложенного, его может вызывать TokenClient, а может вызывать ToeknServer. Соответствующие классыDefaultClusterTokenClient
а такжеDefaultTokenService
.
Ниже приводится интерпретация двух ролей TokenClient и TokenService.
DefaultClusterTokenClient
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
// 如果flowId是无效的,或则count小于等于0
// id == null || id <= 0 || count <= 0;
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
// 新建一个请求对象
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
// 进一步封装为ClusterRequest,消息类型是Flow,
// MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
// 然后向TokenServer发送请求
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
При запуске клиента будет создано соединение с TokenServer.Когда клиентский объект пуст на момент отправки запроса, будет зафиксирован сбой запроса.
DefaultTokenService
После того, как Token Server получит запрос клиента, он вызоветFlowRequestProcessor
изprocessRequest
, который в конечном итоге вызоветDefaultTokenService
изrequestToken
метод.
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
TokenResult result = tokenService.requestToken(flowId, count, prioritized);
return toResponse(result, request);
}
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
// 和上面一样,先进行验证
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
// 从一个Map中进行查找
// private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
// 没有该规则
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
// 服务端进行检查,是否发送令牌token
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
потому чтоacquireClusterToken
Он относительно длинный, поэтому разбит на пояснения.
первый шаг:
Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
} // @1
@1 Сначала код определяет, разрешено ли это приложение лицензии. Это связано с тем, что TokenServe поддерживает встраивание, то есть поддерживает встраивание TokenServer в узел приложения. Это действие ограничено по текущему моменту.
После срабатывания текущего лимита клиенту будет возвращен код состояния Too_Many_Request. Sentinel поддерживает текущий лимит по пространству имен, который реализуется GlobalRequestLimiter. Внутренняя коллекция этого класса также основана на скользящем окне. Принцип аналогичен FlowSlot.Ограничение тока TPS по умолчанию составляет 3 Вт.
static boolean allowProceed(long flowId) {
String namespace = ClusterFlowRuleManager.getNamespace(flowId);
return GlobalRequestLimiter.tryPass(namespace);
}
public static boolean tryPass(String namespace) {
if (namespace == null) {
return false;
}
// private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
if (limiter == null) {
return true;
}
return limiter.tryPass();
}
canPass
Метод заключается в том, чтобы вычислить, превышает ли текущий номер прохода + 1 qpsAllowed.
public boolean tryPass() {
if (canPass()) {
add(1);
return true;
}
return false;
}
public void add(int x) {
data.currentWindow().value().add(x);
}
public boolean canPass() {
return getQps() + 1 <= qpsAllowed;
}
public double getQps() {
return getSum() / data.getIntervalInSecond();
}
private final LeapArray<LongAdder> data;
public long getSum() {
data.currentWindow();
long success = 0;
List<LongAdder> list = data.values();
for (LongAdder window : list) {
success += window.sum();
}
return success;
}
Из вышеизложенного видно, что временное окно также используется для статистики в кластерном режиме.
Второй большой шаг:
Получить сборщик парных индикаторов на основе FlowIdmetric
.
private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
Метрика – это конкретноClusterMetricLeapArray
, с предыдущимOccupiableBucketLeapArray
Точно так же есть дополнительный массив, в который записываются данные о вытеснении.
public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {
private final LongAdder[] occupyCounter;
private boolean hasOccupied = false;
public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
ClusterFlowEvent[] events = ClusterFlowEvent.values();
this.occupyCounter = new LongAdder[events.length];
for (ClusterFlowEvent event : events) {
occupyCounter[event.ordinal()] = new LongAdder();
}
}
Третий шаг:
Получите текущие пройденные Qps, общее количество установленных лицензий и количество оставшихся лицензий.
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
еслиFLOW_THRESHOLD_GLOBAL
, то есть количество лицензий в кластере равно значению счетчика, настроенному в текущем правиле ограничения.
еслиFLOW_THRESHOLD_AVG_LOCAL
, значение, настроенное в текущем правиле ограничения, представляет собой только значение счетчика для одной машины, а также умножается на количество клиентов в кластере. надgetExceedCount
По умолчанию 1.0.
private static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}
Четвертый шаг:
Если количество оставшихся лицензий больше или равно 0, обновите текущую статистику.
if (nextRemaining >= 0) {
//增加通过数和通过的请求数
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
// Pass (pre-occupy incoming buckets)
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
}
Пятый шаг:
Если оставшееся число меньше 0.
if (prioritized) {
// Try to occupy incoming buckets.
// Waiting due to flow shaping or for next bucket tick.
// 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
// 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
if (occupyAvg <=
// 默认是1.0 , 后面是全局的通过阈值数
ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
// 计算等待的时间
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
// 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
return blockedResult();
}
Далее мы объясним, как рассчитать время ожидания.
// event 为 pass
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
// 当前的通过数
double latestQps = getAvg(ClusterFlowEvent.PASS);
// 判断是否支持抢占
if (!canOccupy(event, acquireCount, latestQps, threshold)) {
return 0;
}
// 在抢占数组中添加本次的占用数
//
public void addOccupyPass(int count) {
occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
this.hasOccupied = true;
}
//
metric.addOccupyPass(acquireCount);
// 在普通的时间窗口增加等待数
//
public void add(ClusterFlowEvent event, long count) {
metric.currentWindow().value().add(event, count);
}
//
add(ClusterFlowEvent.WAITING, acquireCount);
// 这里有些不懂,sampleCount默认值应该是10,
// 这样的话返回的是一个时间窗口的大小
return 1000 / metric.getSampleCount();
}
Как видно из вышеизложенного,canOccupy
является ключевым методом.
private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
long headPass = metric.getFirstCountOfWindow(event);
//
public long getOccupiedCount(ClusterFlowEvent event) {
return occupyCounter[event.ordinal()].sum();
}
//
// 获得Pass事件下的已占用的数
long occupiedCount = metric.getOccupiedCount(event);
// 已通过的请求数 + 本次需要的请求数 + 占用的请求数 - 第一个统计的时间窗口的数
// 如果小于等于阈值,即可以抢占
return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
}
public long getFirstCountOfWindow(ClusterFlowEvent event) {
if (event == null) {
return 0;
}
WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
if (windowWrap == null) {
return 0;
}
return windowWrap.value().get(event);
}
public WindowWrap<T> getValidHead() {
return getValidHead(TimeUtil.currentTimeMillis());
}
WindowWrap<T> getValidHead(long timeMillis) {
// Calculate index for expected head time.
int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
WindowWrap<T> wrap = array.get(idx);
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
return wrap;
}
В этот момент сервер определил, отправлять ли токен.
Затем переместите код обратно туда, куда был отправлен запрос клиента.
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
Как видно из вышеизложенного, оставшееся число в возвращаемом результате и будет временем ожидания.
Как работать с результатом Результат.
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
DefaultNode node,
int acquireCount, boolean prioritized) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
}
Как видно из вышесказанного, если статус в порядке, возвращаем true и разрешаем пройти.
Если вытеснение поддерживается в случае приоритета, подождите в соответствии с возвращенным временем ожидания.
Если он находится в других состояниях: вернитесь в автономный режим для оценки.
По умолчанию возвращает ложь.
На данный момент я закончил объяснять лимит тока кластера, но логика расчета времени ожидания так и не ясна, добро пожаловать на биржу.